This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 0152eb5 HDDS-4404. Datanode can go OOM when a Recon or SCM Server is
very slow in processing reports (#1601)
0152eb5 is described below
commit 0152eb5a728aaccf23cefccdd4f406b5fd45b6ca
Author: Siyao Meng <[email protected]>
AuthorDate: Mon Dec 14 03:28:22 2020 -0800
HDDS-4404. Datanode can go OOM when a Recon or SCM Server is very slow in
processing reports (#1601)
* HDDS-4404. Datanode can go OOM when a Recon or SCM Server is very slow in
processing reports.
Change-Id: Idf6394d0bde2fb1c9fe99a96c49fa28677e0e527
* Checkstyle.
Change-Id: I6c4729634905bf6bba5bb6483c393d3c7511a78e
* Add mock-maker-inline so UT can mock final classes.
Change-Id: I63470510ed2089438f08f4d3cea321c1b5f01c3c
* Remove throw exception in addReport.
Change-Id: Id3ff6222ed279568d32876afc0a6a5ee1316c926
* Fix existing UT TestStateContext#testReportAPIs.
Change-Id: I25020b4411f4163a2ec134431793bfe663e5f6bf
* Remove unused imports.
Change-Id: I2dc93c42fc7468b37d41a179ca047ee385fc8896
* Work around mockito bug in UT TestCreatePipelineCommandHandler: caused by
adding mock-maker-inline.
Change-Id: I0f21c508af1f7b36d7632aaa3b9173a17056f211
* Add UT testContainerNodePipelineReportAPIs.
Change-Id: I8d67a1b6e2d9043fb1479e2547d2e690932f7985
* Clean up.
Change-Id: Iaec544ba6fb211d0d71fe03b26b0fc70199dc882
* @VisibleForTesting for get container/node/pipeline reports as they are
only used in UT, at least at the moment.
Change-Id: I72070fde8df220ecebf2fb6d1015d19f65cee11e
* - Renamed `reports` to `incrementalReportsQueue` to better reflect the
functionality of the map.
- Added `acceptedIncrementalReportTypeSet` for rigorous report type
checking.
- `StateContext#addReport` now explicitly throws, instead of hiding any
possible NPEs. Rationale: As I think again, those reports are all internally
generated rather than dependent on direct user input. It's better to catch any
problem early (if any).
- `StateContext#addReport` also strictly checks for report type now. This
intends to force any future additions of report types to be explicitly allowed.
Only 5 report types (names) are expected:
```
hadoop.hdds.NodeReportProto
hadoop.hdds.ContainerReportsProto
hadoop.hdds.IncrementalContainerReportProto
hadoop.hdds.CommandStatusReportsProto
hadoop.hdds.PipelineReportsProto
```
- `StateContext#putBackReports` now checks ALL reports in the list to be
put back. I'd expect the list to be small. And since the logic is run on each
DataNode, it shouldn't become a performance bottleneck. Also added debug
logging just in case we need to diagnose it in the future.
- Added new UT `testPutBackReports` and `testReportQueueWithAddReports` to
check that `putBackReports` and `getReports` behaves.
Change-Id: I48aca3d4a422d52532b39c9462a30519c9cbe0eb
* Make containerReports, nodeReport, pipelineReports atomic and final.
Change-Id: I44aba0ff093fa0ff5f9a35a74ff4126d9c73dd7b
* Checkstyle.
Change-Id: I545ca8c636b8bbd1239a936603b6efcbfbd10b88
* Clean up.
Change-Id: I5cdc38a297cf3d91bad8335eb36f2ba6b2009c10
* Remove assertion in putBackReports as requestBuilder might include
Container/Node/Pipeline reports.
Thanks Aravindan.
Change-Id: Ic39f53de8bb91a179bd5dc3301aa89ffecf5c076
* Empty commit to retrigger CI.
Change-Id: I46fb65ce2c81cb607dd10a9ee829d829ec7a8cfc
* Empty commit to retrigger CI.
Change-Id: I9d3b5ce0b81d457344ecd6b4ad6e141169bd89d3
* Retrigger.
Change-Id: I4e9dace08152e21c2bb5da564487d776a84c4ade
* Allow null as input to StateContext#addReport() as this blocked UT
TestStorageContainerManager#testBlockDeletionTransactions.
Change-Id: I16c8bfcfbd619a23241ea6dda9b95265714984bb
* Retrigger CI
Change-Id: I64203ebbaeebdb4c598bb8dad578579c82bf604b
* trigger new CI check
Co-authored-by: Doroszlai, Attila <[email protected]>
---
.../common/statemachine/StateContext.java | 143 +++++++++--
.../states/endpoint/HeartbeatEndpointTask.java | 16 +-
.../common/statemachine/TestStateContext.java | 286 ++++++++++++++++++++-
.../TestCreatePipelineCommandHandler.java | 6 +-
.../org.mockito.plugins.MockMaker | 16 ++
5 files changed, 435 insertions(+), 32 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 4cd769f..f87561a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -32,15 +32,23 @@ import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.collect.Sets;
+import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus.Status;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatusReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
+import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.container.common.states.DatanodeState;
import
org.apache.hadoop.ozone.container.common.states.datanode.InitDatanodeState;
@@ -63,6 +71,27 @@ import org.slf4j.LoggerFactory;
* Current Context of State Machine.
*/
public class StateContext {
+
+ @VisibleForTesting
+ final static String CONTAINER_REPORTS_PROTO_NAME =
+ ContainerReportsProto.getDescriptor().getFullName();
+ @VisibleForTesting
+ final static String NODE_REPORT_PROTO_NAME =
+ NodeReportProto.getDescriptor().getFullName();
+ @VisibleForTesting
+ final static String PIPELINE_REPORTS_PROTO_NAME =
+ PipelineReportsProto.getDescriptor().getFullName();
+ @VisibleForTesting
+ final static String COMMAND_STATUS_REPORTS_PROTO_NAME =
+ CommandStatusReportsProto.getDescriptor().getFullName();
+ @VisibleForTesting
+ final static String INCREMENTAL_CONTAINER_REPORT_PROTO_NAME =
+ IncrementalContainerReportProto.getDescriptor().getFullName();
+ // Accepted types of reports that can be queued to incrementalReportsQueue
+ private final static Set<String> ACCEPTED_INCREMENTAL_REPORT_TYPE_SET =
+ Sets.newHashSet(COMMAND_STATUS_REPORTS_PROTO_NAME,
+ INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
+
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
private final Queue<SCMCommand> commandQueue;
@@ -72,7 +101,13 @@ public class StateContext {
private final AtomicLong stateExecutionCount;
private final ConfigurationSource conf;
private final Set<InetSocketAddress> endpoints;
- private final Map<InetSocketAddress, List<GeneratedMessage>> reports;
+ // Only the latest full report of each type is kept
+ private final AtomicReference<GeneratedMessage> containerReports;
+ private final AtomicReference<GeneratedMessage> nodeReport;
+ private final AtomicReference<GeneratedMessage> pipelineReports;
+ // Incremental reports are queued in the map below
+ private final Map<InetSocketAddress, List<GeneratedMessage>>
+ incrementalReportsQueue;
private final Map<InetSocketAddress, Queue<ContainerAction>>
containerActions;
private final Map<InetSocketAddress, Queue<PipelineAction>> pipelineActions;
private DatanodeStateMachine.DatanodeStates state;
@@ -102,7 +137,10 @@ public class StateContext {
this.parent = parent;
commandQueue = new LinkedList<>();
cmdStatusMap = new ConcurrentHashMap<>();
- reports = new HashMap<>();
+ incrementalReportsQueue = new HashMap<>();
+ containerReports = new AtomicReference<>();
+ nodeReport = new AtomicReference<>();
+ pipelineReports = new AtomicReference<>();
endpoints = new HashSet<>();
containerActions = new HashMap<>();
pipelineActions = new HashMap<>();
@@ -190,17 +228,34 @@ public class StateContext {
public boolean getShutdownOnError() {
return shutdownOnError;
}
+
/**
* Adds the report to report queue.
*
* @param report report to be added
*/
public void addReport(GeneratedMessage report) {
- if (report != null) {
- synchronized (reports) {
- for (InetSocketAddress endpoint : endpoints) {
- reports.get(endpoint).add(report);
+ if (report == null) {
+ return;
+ }
+ final Descriptor descriptor = report.getDescriptorForType();
+ Preconditions.checkState(descriptor != null);
+ final String reportType = descriptor.getFullName();
+ Preconditions.checkState(reportType != null);
+ for (InetSocketAddress endpoint : endpoints) {
+ if (reportType.equals(CONTAINER_REPORTS_PROTO_NAME)) {
+ containerReports.set(report);
+ } else if (reportType.equals(NODE_REPORT_PROTO_NAME)) {
+ nodeReport.set(report);
+ } else if (reportType.equals(PIPELINE_REPORTS_PROTO_NAME)) {
+ pipelineReports.set(report);
+ } else if (ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
+ synchronized (incrementalReportsQueue) {
+ incrementalReportsQueue.get(endpoint).add(report);
}
+ } else {
+ throw new IllegalArgumentException(
+ "Unidentified report message type: " + reportType);
}
}
}
@@ -214,9 +269,24 @@ public class StateContext {
*/
public void putBackReports(List<GeneratedMessage> reportsToPutBack,
InetSocketAddress endpoint) {
- synchronized (reports) {
- if (reports.containsKey(endpoint)){
- reports.get(endpoint).addAll(0, reportsToPutBack);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("endpoint: {}, size of reportsToPutBack: {}",
+ endpoint, reportsToPutBack.size());
+ }
+ // We don't expect too much reports to be put back
+ for (GeneratedMessage report : reportsToPutBack) {
+ final Descriptor descriptor = report.getDescriptorForType();
+ Preconditions.checkState(descriptor != null);
+ final String reportType = descriptor.getFullName();
+ Preconditions.checkState(reportType != null);
+ if (!ACCEPTED_INCREMENTAL_REPORT_TYPE_SET.contains(reportType)) {
+ throw new IllegalArgumentException(
+ "Unaccepted report message type: " + reportType);
+ }
+ }
+ synchronized (incrementalReportsQueue) {
+ if (incrementalReportsQueue.containsKey(endpoint)){
+ incrementalReportsQueue.get(endpoint).addAll(0, reportsToPutBack);
}
}
}
@@ -232,6 +302,22 @@ public class StateContext {
return getReports(endpoint, Integer.MAX_VALUE);
}
+ List<GeneratedMessage> getIncrementalReports(
+ InetSocketAddress endpoint, int maxLimit) {
+ List<GeneratedMessage> reportsToReturn = new LinkedList<>();
+ synchronized (incrementalReportsQueue) {
+ List<GeneratedMessage> reportsForEndpoint =
+ incrementalReportsQueue.get(endpoint);
+ if (reportsForEndpoint != null) {
+ List<GeneratedMessage> tempList = reportsForEndpoint.subList(
+ 0, min(reportsForEndpoint.size(), maxLimit));
+ reportsToReturn.addAll(tempList);
+ tempList.clear();
+ }
+ }
+ return reportsToReturn;
+ }
+
/**
* Returns available reports from the report queue with a max limit on
* list size, or empty list if the queue is empty.
@@ -240,15 +326,19 @@ public class StateContext {
*/
public List<GeneratedMessage> getReports(InetSocketAddress endpoint,
int maxLimit) {
- List<GeneratedMessage> reportsToReturn = new LinkedList<>();
- synchronized (reports) {
- List<GeneratedMessage> reportsForEndpoint = reports.get(endpoint);
- if (reportsForEndpoint != null) {
- List<GeneratedMessage> tempList = reportsForEndpoint.subList(
- 0, min(reportsForEndpoint.size(), maxLimit));
- reportsToReturn.addAll(tempList);
- tempList.clear();
- }
+ List<GeneratedMessage> reportsToReturn =
+ getIncrementalReports(endpoint, maxLimit);
+ GeneratedMessage report = containerReports.get();
+ if (report != null) {
+ reportsToReturn.add(report);
+ }
+ report = nodeReport.get();
+ if (report != null) {
+ reportsToReturn.add(report);
+ }
+ report = pipelineReports.get();
+ if (report != null) {
+ reportsToReturn.add(report);
}
return reportsToReturn;
}
@@ -580,7 +670,22 @@ public class StateContext {
this.endpoints.add(endpoint);
this.containerActions.put(endpoint, new LinkedList<>());
this.pipelineActions.put(endpoint, new LinkedList<>());
- this.reports.put(endpoint, new LinkedList<>());
+ this.incrementalReportsQueue.put(endpoint, new LinkedList<>());
}
}
+
+ @VisibleForTesting
+ public GeneratedMessage getContainerReports() {
+ return containerReports.get();
+ }
+
+ @VisibleForTesting
+ public GeneratedMessage getNodeReport() {
+ return nodeReport.get();
+ }
+
+ @VisibleForTesting
+ public GeneratedMessage getPipelineReports() {
+ return pipelineReports.get();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index cea4295..4e436c4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -142,15 +142,15 @@ public class HeartbeatEndpointTask
if (LOG.isDebugEnabled()) {
LOG.debug("Sending heartbeat message :: {}", request.toString());
}
- SCMHeartbeatResponseProto reponse = rpcEndpoint.getEndPoint()
+ SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint()
.sendHeartbeat(request);
- processResponse(reponse, datanodeDetailsProto);
+ processResponse(response, datanodeDetailsProto);
rpcEndpoint.setLastSuccessfulHeartbeat(ZonedDateTime.now());
rpcEndpoint.zeroMissedCount();
} catch (IOException ex) {
+ Preconditions.checkState(requestBuilder != null);
// put back the reports which failed to be sent
putBackReports(requestBuilder);
-
rpcEndpoint.logIfNeeded(ex);
} finally {
rpcEndpoint.unlock();
@@ -161,12 +161,9 @@ public class HeartbeatEndpointTask
// TODO: Make it generic.
private void putBackReports(SCMHeartbeatRequestProto.Builder requestBuilder)
{
List<GeneratedMessage> reports = new LinkedList<>();
- if (requestBuilder.hasContainerReport()) {
- reports.add(requestBuilder.getContainerReport());
- }
- if (requestBuilder.hasNodeReport()) {
- reports.add(requestBuilder.getNodeReport());
- }
+ // We only put back CommandStatusReports and IncrementalContainerReport
+ // because those are incremental. Container/Node/PipelineReport are
+ // accumulative so we can keep only the latest of each.
if (requestBuilder.getCommandStatusReportsCount() != 0) {
reports.addAll(requestBuilder.getCommandStatusReportsList());
}
@@ -194,6 +191,7 @@ public class HeartbeatEndpointTask
} else {
requestBuilder.setField(descriptor, report);
}
+ break;
}
}
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index d3032c3..e9c39d3 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -23,11 +23,19 @@ import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProt
import static org.apache.hadoop.test.GenericTestUtils.waitFor;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.when;
import java.net.InetSocketAddress;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ExecutorService;
@@ -37,6 +45,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import com.google.protobuf.Descriptors.Descriptor;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineAction;
@@ -53,6 +62,271 @@ import com.google.protobuf.GeneratedMessage;
*/
public class TestStateContext {
+ /**
+ * Only accepted types of reports can be put back to the report queue.
+ */
+ @Test
+ public void testPutBackReports() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachineMock =
+ mock(DatanodeStateMachine.class);
+
+ StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
+ datanodeStateMachineMock);
+ InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+ ctx.addEndpoint(scm1);
+ InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+ ctx.addEndpoint(scm2);
+
+ Map<String, Integer> expectedReportCount = new HashMap<>();
+
+ // Case 1: Put back an incremental report
+
+ ctx.putBackReports(Collections.singletonList(newMockReport(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME)), scm1);
+ // scm2 report queue should be empty
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // Check scm1 queue
+ expectedReportCount.put(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.clear();
+
+ ctx.putBackReports(Collections.singletonList(newMockReport(
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)), scm2);
+ // scm1 report queue should be empty
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ // Check scm2 queue
+ expectedReportCount.put(
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.clear();
+
+ // Case 2: Attempt to put back a full report
+
+ try {
+ ctx.putBackReports(Collections.singletonList(
+ newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME)), scm1);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
+ try {
+ ctx.putBackReports(Collections.singletonList(
+ newMockReport(StateContext.NODE_REPORT_PROTO_NAME)), scm2);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
+ try {
+ ctx.putBackReports(Collections.singletonList(
+ newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)), scm1);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // Case 3: Put back mixed types of incremental reports
+
+ ctx.putBackReports(Arrays.asList(
+ newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
+ newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+ newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+ newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME),
+ newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME)
+ ), scm1);
+ // scm2 report queue should be empty
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // Check scm1 queue
+ expectedReportCount.put(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 2);
+ expectedReportCount.put(
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 3);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.clear();
+
+ // Case 4: Attempt to put back mixed types of full reports
+
+ try {
+ ctx.putBackReports(Arrays.asList(
+ newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
+ newMockReport(StateContext.NODE_REPORT_PROTO_NAME),
+ newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME)
+ ), scm1);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
+
+ // Case 5: Attempt to put back mixed full and incremental reports
+
+ try {
+ ctx.putBackReports(Arrays.asList(
+ newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME),
+ newMockReport(StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME),
+ newMockReport(StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME)
+ ), scm2);
+ fail("Should throw exception when putting back unaccepted reports!");
+ } catch (IllegalArgumentException ignored) {
+ }
+ }
+
+ @Test
+ public void testReportQueueWithAddReports() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachineMock =
+ mock(DatanodeStateMachine.class);
+
+ StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
+ datanodeStateMachineMock);
+ InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+ ctx.addEndpoint(scm1);
+ InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+ ctx.addEndpoint(scm2);
+ // Check initial state
+ assertEquals(0, ctx.getAllAvailableReports(scm1).size());
+ assertEquals(0, ctx.getAllAvailableReports(scm2).size());
+
+ Map<String, Integer> expectedReportCount = new HashMap<>();
+
+ // Add a bunch of ContainerReports
+ batchAddReports(ctx, StateContext.CONTAINER_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.CONTAINER_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+ // Add a bunch of NodeReport
+ batchAddReports(ctx, StateContext.NODE_REPORT_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.NODE_REPORT_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+ // Add a bunch of PipelineReports
+ batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+ // Add a bunch of PipelineReports
+ batchAddReports(ctx, StateContext.PIPELINE_REPORTS_PROTO_NAME, 128);
+ // Should only keep the latest one
+ expectedReportCount.put(StateContext.PIPELINE_REPORTS_PROTO_NAME, 1);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+
+ // Add a bunch of CommandStatusReports
+ batchAddReports(ctx,
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ expectedReportCount.put(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME, 128);
+ // Should keep all of them
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.remove(
+ StateContext.COMMAND_STATUS_REPORTS_PROTO_NAME);
+
+ // Add a bunch of IncrementalContainerReport
+ batchAddReports(ctx,
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
+ // Should keep all of them
+ expectedReportCount.put(
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME, 128);
+ checkReportCount(ctx.getAllAvailableReports(scm1), expectedReportCount);
+ checkReportCount(ctx.getAllAvailableReports(scm2), expectedReportCount);
+ // getReports dequeues incremental reports
+ expectedReportCount.remove(
+ StateContext.INCREMENTAL_CONTAINER_REPORT_PROTO_NAME);
+ }
+
+ void batchAddReports(StateContext ctx, String reportName, int count) {
+ for (int i = 0; i < count; i++) {
+ ctx.addReport(newMockReport(reportName));
+ }
+ }
+
+ void checkReportCount(List<GeneratedMessage> reports,
+ Map<String, Integer> expectedReportCount) {
+ Map<String, Integer> reportCount = new HashMap<>();
+ for (GeneratedMessage report : reports) {
+ final String reportName = report.getDescriptorForType().getFullName();
+ reportCount.put(reportName, reportCount.getOrDefault(reportName, 0) + 1);
+ }
+ // Verify
+ assertEquals(expectedReportCount, reportCount);
+ }
+
+ /**
+ * Check if Container, Node and Pipeline report APIs work as expected.
+ */
+ @Test
+ public void testContainerNodePipelineReportAPIs() {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachineMock =
+ mock(DatanodeStateMachine.class);
+
+ // ContainerReports
+ StateContext context1 = newStateContext(conf, datanodeStateMachineMock);
+ assertNull(context1.getContainerReports());
+ assertNull(context1.getNodeReport());
+ assertNull(context1.getPipelineReports());
+ GeneratedMessage containerReports =
+ newMockReport(StateContext.CONTAINER_REPORTS_PROTO_NAME);
+ context1.addReport(containerReports);
+
+ assertNotNull(context1.getContainerReports());
+ assertEquals(StateContext.CONTAINER_REPORTS_PROTO_NAME,
+ context1.getContainerReports().getDescriptorForType().getFullName());
+ assertNull(context1.getNodeReport());
+ assertNull(context1.getPipelineReports());
+
+ // NodeReport
+ StateContext context2 = newStateContext(conf, datanodeStateMachineMock);
+ GeneratedMessage nodeReport =
+ newMockReport(StateContext.NODE_REPORT_PROTO_NAME);
+ context2.addReport(nodeReport);
+
+ assertNull(context2.getContainerReports());
+ assertNotNull(context2.getNodeReport());
+ assertEquals(StateContext.NODE_REPORT_PROTO_NAME,
+ context2.getNodeReport().getDescriptorForType().getFullName());
+ assertNull(context2.getPipelineReports());
+
+ // PipelineReports
+ StateContext context3 = newStateContext(conf, datanodeStateMachineMock);
+ GeneratedMessage pipelineReports =
+ newMockReport(StateContext.PIPELINE_REPORTS_PROTO_NAME);
+ context3.addReport(pipelineReports);
+
+ assertNull(context3.getContainerReports());
+ assertNull(context3.getNodeReport());
+ assertNotNull(context3.getPipelineReports());
+ assertEquals(StateContext.PIPELINE_REPORTS_PROTO_NAME,
+ context3.getPipelineReports().getDescriptorForType().getFullName());
+ }
+
+ private StateContext newStateContext(OzoneConfiguration conf,
+ DatanodeStateMachine datanodeStateMachineMock) {
+ StateContext stateContext = new StateContext(conf,
+ DatanodeStates.getInitState(), datanodeStateMachineMock);
+ InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
+ stateContext.addEndpoint(scm1);
+ InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
+ stateContext.addEndpoint(scm2);
+ return stateContext;
+ }
+
+ private GeneratedMessage newMockReport(String messageType) {
+ GeneratedMessage pipelineReports = mock(GeneratedMessage.class);
+ when(pipelineReports.getDescriptorForType()).thenReturn(
+ mock(Descriptor.class));
+ when(pipelineReports.getDescriptorForType().getFullName()).thenReturn(
+ messageType);
+ return pipelineReports;
+ }
+
@Test
public void testReportAPIs() {
OzoneConfiguration conf = new OzoneConfiguration();
@@ -64,8 +338,14 @@ public class TestStateContext {
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
- // Try to add report with endpoint. Should not be stored.
- stateContext.addReport(mock(GeneratedMessage.class));
+ GeneratedMessage generatedMessage = mock(GeneratedMessage.class);
+ when(generatedMessage.getDescriptorForType()).thenReturn(
+ mock(Descriptor.class));
+ when(generatedMessage.getDescriptorForType().getFullName()).thenReturn(
+ "hadoop.hdds.CommandStatusReportsProto");
+
+ // Try to add report with zero endpoint. Should not be stored.
+ stateContext.addReport(generatedMessage);
assertTrue(stateContext.getAllAvailableReports(scm1).isEmpty());
// Add 2 scm endpoints.
@@ -73,7 +353,7 @@ public class TestStateContext {
stateContext.addEndpoint(scm2);
// Add report. Should be added to all endpoints.
- stateContext.addReport(mock(GeneratedMessage.class));
+ stateContext.addReport(generatedMessage);
List<GeneratedMessage> allAvailableReports =
stateContext.getAllAvailableReports(scm1);
assertEquals(1, allAvailableReports.size());
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
index febd1c3..d23f1c4 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCreatePipelineCommandHandler.java
@@ -44,6 +44,7 @@ import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.mockito.Mockito;
+import org.mockito.stubbing.Answer;
import org.powermock.api.mockito.PowerMockito;
import org.powermock.core.classloader.annotations.PrepareForTest;
import org.powermock.modules.junit4.PowerMockRunner;
@@ -79,7 +80,10 @@ public class TestCreatePipelineCommandHandler {
Mockito.when(raftClient.getGroupManagementApi(
Mockito.any(RaftPeerId.class))).thenReturn(raftClientGroupManager);
PowerMockito.mockStatic(RaftClient.class);
- PowerMockito.when(RaftClient.newBuilder()).thenReturn(builder);
+ // Work around for mockito bug:
+ // https://github.com/powermock/powermock/issues/992
+ PowerMockito.when(RaftClient.newBuilder()).thenAnswer(
+ (Answer<RaftClient.Builder>) invocation -> builder);
}
private RaftClient.Builder mockRaftClientBuilder() {
diff --git
a/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
new file mode 100644
index 0000000..3c9e1c8
--- /dev/null
+++
b/hadoop-hdds/container-service/src/test/resources/mockito-extensions/org.mockito.plugins.MockMaker
@@ -0,0 +1,16 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+mock-maker-inline
\ No newline at end of file
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]