This is an automated email from the ASF dual-hosted git repository. sshenoy pushed a commit to branch HDDS-5713 in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 45cd13338040f04f49c9ee766b1dfb8cf189eb92 Author: Stephen O'Donnell <[email protected]> AuthorDate: Sun Feb 11 21:59:44 2024 +0000 HDDS-10332. [Diskbalancer] Include Disk Balancer Report in the heartbeat message (#6201) --- .../states/endpoint/HeartbeatEndpointTask.java | 7 +++ .../container/diskbalancer/DiskBalancerInfo.java | 15 ++++++ .../states/endpoint/TestHeartbeatEndpointTask.java | 61 ++++++++++------------ 3 files changed, 51 insertions(+), 32 deletions(-) diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java index 0685650361..850f7674af 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java @@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.statemachine import org.apache.hadoop.ozone.container.common.statemachine .EndpointStateMachine.EndPointStates; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand; import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand; import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand; @@ -178,6 +179,7 @@ public class HeartbeatEndpointTask addContainerActions(requestBuilder); addPipelineActions(requestBuilder); addQueuedCommandCounts(requestBuilder); + addDiskBalancerReport(requestBuilder); SCMHeartbeatRequestProto request = requestBuilder.build(); LOG.debug("Sending heartbeat message : {}", request); SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint() @@ -288,6 +290,11 @@ public class HeartbeatEndpointTask requestBuilder.setCommandQueueReport(reportProto.build()); } + private void addDiskBalancerReport(SCMHeartbeatRequestProto.Builder requestBuilder) { + DiskBalancerInfo info = context.getParent().getContainer().getDiskBalancerInfo(); + requestBuilder.setDiskBalancerReport(info.toDiskBalancerReportProto()); + } + /** * Returns a builder class for HeartbeatEndpointTask task. * @return Builder. diff --git a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java index 873c89aad1..086480fa6b 100644 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java @@ -16,9 +16,12 @@ */ package org.apache.hadoop.ozone.container.diskbalancer; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration; import java.util.Objects; +import java.util.Optional; /** * DiskBalancer's information to persist. @@ -66,6 +69,18 @@ public class DiskBalancerInfo { } } + public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto toDiskBalancerReportProto() { + DiskBalancerConfiguration conf = new DiskBalancerConfiguration(Optional.of(threshold), + Optional.of(bandwidthInMB), Optional.of(parallelThread)); + HddsProtos.DiskBalancerConfigurationProto confProto = conf.toProtobufBuilder().build(); + + StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder builder = + StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.newBuilder(); + builder.setIsRunning(shouldRun); + builder.setDiskBalancerConf(confProto); + return builder.build(); + } + public boolean isShouldRun() { return shouldRun; } diff --git a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java index 7d8b94e57d..1a71c0e3e0 100644 --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java @@ -39,7 +39,6 @@ import java.util.UUID; import com.google.protobuf.Proto2Utils; import org.apache.hadoop.hdds.client.ECReplicationConfig; -import org.apache.hadoop.hdds.conf.ConfigurationSource; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@ -57,9 +56,12 @@ import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin import org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates; import org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine; import org.apache.hadoop.ozone.container.common.statemachine.StateContext; +import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo; +import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer; import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; import org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB; +import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.mockito.ArgumentCaptor; @@ -71,6 +73,19 @@ public class TestHeartbeatEndpointTask { private static final InetSocketAddress TEST_SCM_ENDPOINT = new InetSocketAddress("test-scm-1", 9861); + private OzoneConfiguration conf; + private DatanodeStateMachine datanodeStateMachine; + private OzoneContainer container; + + @BeforeEach + public void setup() { + conf = new OzoneConfiguration(); + datanodeStateMachine = mock(DatanodeStateMachine.class); + container = mock(OzoneContainer.class); + when(container.getDiskBalancerInfo()).thenReturn(new DiskBalancerInfo(true, 10, 20, 30)); + when(datanodeStateMachine.getContainer()).thenReturn(container); + } + @Test public void handlesReconstructContainerCommand() throws Exception { StorageContainerDatanodeProtocolClientSideTranslatorPB scm = @@ -96,14 +111,11 @@ public class TestHeartbeatEndpointTask { .build()) .build()); - OzoneConfiguration conf = new OzoneConfiguration(); - DatanodeStateMachine datanodeStateMachine = - mock(DatanodeStateMachine.class); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, datanodeStateMachine, ""); // WHEN - HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm); + HeartbeatEndpointTask task = getHeartbeatEndpointTask(context, scm); task.call(); // THEN @@ -128,12 +140,10 @@ public class TestHeartbeatEndpointTask { .setTerm(termInSCM) .build()); - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); context.setTermOfLeaderSCM(1); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); endpointTask.call(); SCMHeartbeatRequestProto heartbeat = argument.getValue(); assertTrue(heartbeat.hasDatanodeDetails()); @@ -148,9 +158,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithNodeReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -165,8 +174,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(NodeReportProto.getDefaultInstance()); endpointTask.call(); @@ -180,9 +188,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithContainerReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -197,8 +204,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); endpointTask.call(); @@ -212,9 +218,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithCommandStatusReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -229,8 +234,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.addIncrementalReport( CommandStatusReportsProto.getDefaultInstance()); @@ -245,9 +249,8 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithContainerActions() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, - mock(DatanodeStateMachine.class), ""); + datanodeStateMachine, ""); StorageContainerDatanodeProtocolClientSideTranslatorPB scm = mock( @@ -262,8 +265,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.addContainerAction(getContainerAction()); endpointTask.call(); @@ -277,9 +279,6 @@ public class TestHeartbeatEndpointTask { @Test public void testheartbeatWithAllReports() throws Exception { - OzoneConfiguration conf = new OzoneConfiguration(); - DatanodeStateMachine datanodeStateMachine = - mock(DatanodeStateMachine.class); StateContext context = new StateContext(conf, DatanodeStates.RUNNING, datanodeStateMachine, ""); @@ -305,8 +304,7 @@ public class TestHeartbeatEndpointTask { .getDatanodeDetails().getUuid()) .build()); - HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask( - conf, context, scm); + HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, scm); context.addEndpoint(TEST_SCM_ENDPOINT); context.refreshFullReport(NodeReportProto.getDefaultInstance()); context.refreshFullReport(ContainerReportsProto.getDefaultInstance()); @@ -328,20 +326,19 @@ public class TestHeartbeatEndpointTask { assertEquals(commands.get(queueCount.getCommand(i)).intValue(), queueCount.getCount(i)); } + assertTrue(heartbeat.hasDiskBalancerReport()); } /** * Creates HeartbeatEndpointTask with the given conf, context and * StorageContainerManager client side proxy. * - * @param conf Configuration * @param context StateContext * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB * * @return HeartbeatEndpointTask */ private HeartbeatEndpointTask getHeartbeatEndpointTask( - ConfigurationSource conf, StateContext context, StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) { DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder() --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
