This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-5713
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/HDDS-5713 by this push:
     new 15328df44e HDDS-10332. [Diskbalancer] Include Disk Balancer Report in 
the heartbeat message (#6201)
15328df44e is described below

commit 15328df44e7a3eb50fa62a9893414eb0b721aa63
Author: Stephen O'Donnell <[email protected]>
AuthorDate: Sun Feb 11 21:59:44 2024 +0000

    HDDS-10332. [Diskbalancer] Include Disk Balancer Report in the heartbeat 
message (#6201)
---
 .../states/endpoint/HeartbeatEndpointTask.java     |  7 +++
 .../container/diskbalancer/DiskBalancerInfo.java   | 15 ++++++
 .../states/endpoint/TestHeartbeatEndpointTask.java | 61 ++++++++++------------
 3 files changed, 51 insertions(+), 32 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 0685650361..850f7674af 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -49,6 +49,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
 import org.apache.hadoop.ozone.container.common.statemachine
     .EndpointStateMachine.EndPointStates;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.CreatePipelineCommand;
@@ -178,6 +179,7 @@ public class HeartbeatEndpointTask
       addContainerActions(requestBuilder);
       addPipelineActions(requestBuilder);
       addQueuedCommandCounts(requestBuilder);
+      addDiskBalancerReport(requestBuilder);
       SCMHeartbeatRequestProto request = requestBuilder.build();
       LOG.debug("Sending heartbeat message : {}", request);
       SCMHeartbeatResponseProto response = rpcEndpoint.getEndPoint()
@@ -288,6 +290,11 @@ public class HeartbeatEndpointTask
     requestBuilder.setCommandQueueReport(reportProto.build());
   }
 
+  private void addDiskBalancerReport(SCMHeartbeatRequestProto.Builder 
requestBuilder) {
+    DiskBalancerInfo info = 
context.getParent().getContainer().getDiskBalancerInfo();
+    requestBuilder.setDiskBalancerReport(info.toDiskBalancerReportProto());
+  }
+
   /**
    * Returns a builder class for HeartbeatEndpointTask task.
    * @return   Builder.
diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
index 873c89aad1..086480fa6b 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/diskbalancer/DiskBalancerInfo.java
@@ -16,9 +16,12 @@
  */
 package org.apache.hadoop.ozone.container.diskbalancer;
 
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos;
 import org.apache.hadoop.hdds.scm.storage.DiskBalancerConfiguration;
 
 import java.util.Objects;
+import java.util.Optional;
 
 /**
  * DiskBalancer's information to persist.
@@ -66,6 +69,18 @@ public class DiskBalancerInfo {
     }
   }
 
+  public StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto 
toDiskBalancerReportProto() {
+    DiskBalancerConfiguration conf = new 
DiskBalancerConfiguration(Optional.of(threshold),
+        Optional.of(bandwidthInMB), Optional.of(parallelThread));
+    HddsProtos.DiskBalancerConfigurationProto confProto = 
conf.toProtobufBuilder().build();
+
+    StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.Builder 
builder =
+        
StorageContainerDatanodeProtocolProtos.DiskBalancerReportProto.newBuilder();
+    builder.setIsRunning(shouldRun);
+    builder.setDiskBalancerConf(confProto);
+    return builder.build();
+  }
+
   public boolean isShouldRun() {
     return shouldRun;
   }
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
index 09fa8a9917..270780c205 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/states/endpoint/TestHeartbeatEndpointTask.java
@@ -38,7 +38,6 @@ import java.util.OptionalLong;
 import java.util.UUID;
 
 import org.apache.hadoop.hdds.client.ECReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
 import org.apache.hadoop.hdds.conf.OzoneConfiguration;
 import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 
@@ -56,9 +55,12 @@ import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachin
 import 
org.apache.hadoop.ozone.container.common.statemachine.DatanodeStateMachine.DatanodeStates;
 import 
org.apache.hadoop.ozone.container.common.statemachine.EndpointStateMachine;
 import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.diskbalancer.DiskBalancerInfo;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import 
org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand;
 import 
org.apache.hadoop.ozone.protocolPB.StorageContainerDatanodeProtocolClientSideTranslatorPB;
 
+import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
 import org.mockito.ArgumentCaptor;
 
@@ -70,6 +72,19 @@ public class TestHeartbeatEndpointTask {
   private static final InetSocketAddress TEST_SCM_ENDPOINT =
       new InetSocketAddress("test-scm-1", 9861);
 
+  private OzoneConfiguration conf;
+  private DatanodeStateMachine datanodeStateMachine;
+  private OzoneContainer container;
+
+  @BeforeEach
+  public void setup() {
+    conf = new OzoneConfiguration();
+    datanodeStateMachine = mock(DatanodeStateMachine.class);
+    container = mock(OzoneContainer.class);
+    when(container.getDiskBalancerInfo()).thenReturn(new 
DiskBalancerInfo(true, 10, 20, 30));
+    when(datanodeStateMachine.getContainer()).thenReturn(container);
+  }
+
   @Test
   public void handlesReconstructContainerCommand() throws Exception {
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
@@ -94,14 +109,11 @@ public class TestHeartbeatEndpointTask {
                     .build())
                 .build());
 
-    OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine =
-        mock(DatanodeStateMachine.class);
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
     // WHEN
-    HeartbeatEndpointTask task = getHeartbeatEndpointTask(conf, context, scm);
+    HeartbeatEndpointTask task = getHeartbeatEndpointTask(context, scm);
     task.call();
 
     // THEN
@@ -126,12 +138,10 @@ public class TestHeartbeatEndpointTask {
                 .setTerm(termInSCM)
                 .build());
 
-    OzoneConfiguration conf = new OzoneConfiguration();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        mock(DatanodeStateMachine.class), "");
+        datanodeStateMachine, "");
     context.setTermOfLeaderSCM(1);
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     endpointTask.call();
     SCMHeartbeatRequestProto heartbeat = argument.getValue();
     assertTrue(heartbeat.hasDatanodeDetails());
@@ -146,9 +156,8 @@ public class TestHeartbeatEndpointTask {
 
   @Test
   public void testheartbeatWithNodeReports() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        mock(DatanodeStateMachine.class), "");
+        datanodeStateMachine, "");
 
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
         mock(
@@ -163,8 +172,7 @@ public class TestHeartbeatEndpointTask {
                         .getDatanodeDetails().getUuid())
                 .build());
 
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
     context.refreshFullReport(NodeReportProto.getDefaultInstance());
     endpointTask.call();
@@ -178,9 +186,8 @@ public class TestHeartbeatEndpointTask {
 
   @Test
   public void testheartbeatWithContainerReports() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        mock(DatanodeStateMachine.class), "");
+        datanodeStateMachine, "");
 
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
         mock(
@@ -195,8 +202,7 @@ public class TestHeartbeatEndpointTask {
                         .getDatanodeDetails().getUuid())
                 .build());
 
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
     context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
     endpointTask.call();
@@ -210,9 +216,8 @@ public class TestHeartbeatEndpointTask {
 
   @Test
   public void testheartbeatWithCommandStatusReports() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        mock(DatanodeStateMachine.class), "");
+        datanodeStateMachine, "");
 
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
         mock(
@@ -227,8 +232,7 @@ public class TestHeartbeatEndpointTask {
                         .getDatanodeDetails().getUuid())
                 .build());
 
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
     context.addIncrementalReport(
         CommandStatusReportsProto.getDefaultInstance());
@@ -243,9 +247,8 @@ public class TestHeartbeatEndpointTask {
 
   @Test
   public void testheartbeatWithContainerActions() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
-        mock(DatanodeStateMachine.class), "");
+        datanodeStateMachine, "");
 
     StorageContainerDatanodeProtocolClientSideTranslatorPB scm =
         mock(
@@ -260,8 +263,7 @@ public class TestHeartbeatEndpointTask {
                         .getDatanodeDetails().getUuid())
                 .build());
 
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
     context.addContainerAction(getContainerAction());
     endpointTask.call();
@@ -275,9 +277,6 @@ public class TestHeartbeatEndpointTask {
 
   @Test
   public void testheartbeatWithAllReports() throws Exception {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachine =
-        mock(DatanodeStateMachine.class);
     StateContext context = new StateContext(conf, DatanodeStates.RUNNING,
         datanodeStateMachine, "");
 
@@ -303,8 +302,7 @@ public class TestHeartbeatEndpointTask {
                         .getDatanodeDetails().getUuid())
                 .build());
 
-    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(
-        conf, context, scm);
+    HeartbeatEndpointTask endpointTask = getHeartbeatEndpointTask(context, 
scm);
     context.addEndpoint(TEST_SCM_ENDPOINT);
     context.refreshFullReport(NodeReportProto.getDefaultInstance());
     context.refreshFullReport(ContainerReportsProto.getDefaultInstance());
@@ -326,20 +324,19 @@ public class TestHeartbeatEndpointTask {
       assertEquals(commands.get(queueCount.getCommand(i)).intValue(),
           queueCount.getCount(i));
     }
+    assertTrue(heartbeat.hasDiskBalancerReport());
   }
 
   /**
    * Creates HeartbeatEndpointTask with the given conf, context and
    * StorageContainerManager client side proxy.
    *
-   * @param conf Configuration
    * @param context StateContext
    * @param proxy StorageContainerDatanodeProtocolClientSideTranslatorPB
    *
    * @return HeartbeatEndpointTask
    */
   private HeartbeatEndpointTask getHeartbeatEndpointTask(
-      ConfigurationSource conf,
       StateContext context,
       StorageContainerDatanodeProtocolClientSideTranslatorPB proxy) {
     DatanodeDetails datanodeDetails = DatanodeDetails.newBuilder()


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to