This is an automated email from the ASF dual-hosted git repository.

szetszwo pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new 2139367b52 HDDS-11581. Remove duplicate 
ContainerStateMachine#RaftGroupId (#7312)
2139367b52 is described below

commit 2139367b52d68773f25d79abce0439323dde9671
Author: jianghuazhu <[email protected]>
AuthorDate: Tue Oct 15 07:46:13 2024 +0800

    HDDS-11581. Remove duplicate ContainerStateMachine#RaftGroupId (#7312)
---
 .../server/ratis/ContainerStateMachine.java        | 76 +++++++++++-----------
 1 file changed, 37 insertions(+), 39 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
index 3a48ecaee5..1048ec5092 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/transport/server/ratis/ContainerStateMachine.java
@@ -183,7 +183,6 @@ public class ContainerStateMachine extends BaseStateMachine 
{
 
   private final SimpleStateMachineStorage storage =
       new SimpleStateMachineStorage();
-  private final RaftGroupId gid;
   private final ContainerDispatcher dispatcher;
   private final ContainerController containerController;
   private final XceiverServerRatis ratisServer;
@@ -218,7 +217,6 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       ConfigurationSource conf,
       String threadNamePrefix) {
     this.datanodeService = hddsDatanodeService;
-    this.gid = gid;
     this.dispatcher = dispatcher;
     this.containerController = containerController;
     this.ratisServer = ratisServer;
@@ -282,7 +280,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       throws IOException {
     super.initialize(server, id, raftStorage);
     storage.init(raftStorage);
-    ratisServer.notifyGroupAdd(gid);
+    ratisServer.notifyGroupAdd(id);
 
     LOG.info("{}: initialize {}", server.getId(), id);
     loadSnapshot(storage.getLatestSnapshot());
@@ -293,7 +291,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     if (snapshot == null) {
       TermIndex empty = TermIndex.valueOf(0, RaftLog.INVALID_LOG_INDEX);
       LOG.info("{}: The snapshot info is null. Setting the last applied index 
" +
-              "to:{}", gid, empty);
+              "to:{}", getGroupId(), empty);
       setLastAppliedTermIndex(empty);
       return empty.getIndex();
     }
@@ -301,7 +299,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     final File snapshotFile = snapshot.getFile().getPath().toFile();
     final TermIndex last =
         SimpleStateMachineStorage.getTermIndexFromSnapshotFile(snapshotFile);
-    LOG.info("{}: Setting the last applied index to {}", gid, last);
+    LOG.info("{}: Setting the last applied index to {}", getGroupId(), last);
     setLastAppliedTermIndex(last);
 
     // initialize the dispatcher with snapshot so that it build the missing
@@ -351,7 +349,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     long startTime = Time.monotonicNow();
     if (!isStateMachineHealthy()) {
       String msg =
-          "Failed to take snapshot " + " for " + gid + " as the stateMachine"
+          "Failed to take snapshot " + " for " + getGroupId() + " as the 
stateMachine"
               + " is unhealthy. The last applied index is at " + ti;
       StateMachineException sme = new StateMachineException(msg);
       LOG.error(msg);
@@ -360,19 +358,19 @@ public class ContainerStateMachine extends 
BaseStateMachine {
     if (ti != null && ti.getIndex() != RaftLog.INVALID_LOG_INDEX) {
       final File snapshotFile =
           storage.getSnapshotFile(ti.getTerm(), ti.getIndex());
-      LOG.info("{}: Taking a snapshot at:{} file {}", gid, ti, snapshotFile);
+      LOG.info("{}: Taking a snapshot at:{} file {}", getGroupId(), ti, 
snapshotFile);
       try (FileOutputStream fos = new FileOutputStream(snapshotFile)) {
         persistContainerSet(fos);
         fos.flush();
         // make sure the snapshot file is synced
         fos.getFD().sync();
       } catch (IOException ioe) {
-        LOG.error("{}: Failed to write snapshot at:{} file {}", gid, ti,
+        LOG.error("{}: Failed to write snapshot at:{} file {}", getGroupId(), 
ti,
             snapshotFile);
         throw ioe;
       }
       LOG.info("{}: Finished taking a snapshot at:{} file:{} took: {} ms",
-          gid, ti, snapshotFile, (Time.monotonicNow() - startTime));
+              getGroupId(), ti, snapshotFile, (Time.monotonicNow() - 
startTime));
       return ti.getIndex();
     }
     return -1;
@@ -386,7 +384,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     final StateMachineLogEntryProto stateMachineLogEntry = 
entry.getStateMachineLogEntry();
     final ContainerCommandRequestProto logProto;
     try {
-      logProto = getContainerCommandRequestProto(gid, 
stateMachineLogEntry.getLogData());
+      logProto = getContainerCommandRequestProto(getGroupId(), 
stateMachineLogEntry.getLogData());
     } catch (InvalidProtocolBufferException e) {
       trx.setException(e);
       return trx;
@@ -413,7 +411,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     long startTime = Time.monotonicNowNanos();
     final ContainerCommandRequestProto proto =
         message2ContainerCommandRequestProto(request.getMessage());
-    Preconditions.checkArgument(request.getRaftGroupId().equals(gid));
+    Preconditions.checkArgument(request.getRaftGroupId().equals(getGroupId()));
 
     final TransactionContext.Builder builder = TransactionContext.newBuilder()
         .setClientRequest(request)
@@ -449,7 +447,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         final WriteChunkRequestProto.Builder commitWriteChunkProto = 
WriteChunkRequestProto.newBuilder(write)
             .clearData();
         protoBuilder.setWriteChunk(commitWriteChunkProto)
-            .setPipelineID(gid.getUuid().toString())
+            .setPipelineID(getGroupId().getUuid().toString())
             .setTraceID(proto.getTraceID());
 
         builder.setStateMachineData(write.getData());
@@ -491,20 +489,20 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   private ContainerCommandRequestProto message2ContainerCommandRequestProto(
       Message message) throws InvalidProtocolBufferException {
-    return ContainerCommandRequestMessage.toProto(message.getContent(), gid);
+    return ContainerCommandRequestMessage.toProto(message.getContent(), 
getGroupId());
   }
 
   private ContainerCommandResponseProto dispatchCommand(
       ContainerCommandRequestProto requestProto, DispatcherContext context) {
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", gid,
+      LOG.trace("{}: dispatch {} containerID={} pipelineID={} traceID={}", 
getGroupId(),
           requestProto.getCmdType(), requestProto.getContainerID(),
           requestProto.getPipelineID(), requestProto.getTraceID());
     }
     ContainerCommandResponseProto response =
         dispatcher.dispatch(requestProto, context);
     if (LOG.isTraceEnabled()) {
-      LOG.trace("{}: response {}", gid, response);
+      LOG.trace("{}: response {}", getGroupId(), response);
     }
     return response;
   }
@@ -531,7 +529,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     RaftServer server = ratisServer.getServer();
     Preconditions.checkArgument(!write.getData().isEmpty());
     try {
-      if (server.getDivision(gid).getInfo().isLeader()) {
+      if (server.getDivision(getGroupId()).getInfo().isLeader()) {
         stateMachineDataCache.put(entryIndex, write.getData());
       }
     } catch (InterruptedException ioe) {
@@ -559,7 +557,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
             return dispatchCommand(requestProto, context);
           } catch (Exception e) {
             LOG.error("{}: writeChunk writeStateMachineData failed: blockId" +
-                "{} logIndex {} chunkName {}", gid, write.getBlockID(),
+                "{} logIndex {} chunkName {}", getGroupId(), 
write.getBlockID(),
                 entryIndex, write.getChunkData().getChunkName(), e);
             metrics.incNumWriteDataFails();
             // write chunks go in parallel. It's possible that one write chunk
@@ -573,7 +571,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     writeChunkFutureMap.put(entryIndex, writeChunkFuture);
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: writeChunk writeStateMachineData : blockId" +
-              "{} logIndex {} chunkName {}", gid, write.getBlockID(),
+              "{} logIndex {} chunkName {}", getGroupId(), write.getBlockID(),
           entryIndex, write.getChunkData().getChunkName());
     }
     // Remove the future once it finishes execution from the
@@ -587,7 +585,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           && r.getResult() != ContainerProtos.Result.CHUNK_FILE_INCONSISTENCY) 
{
         StorageContainerException sce =
             new StorageContainerException(r.getMessage(), r.getResult());
-        LOG.error(gid + ": writeChunk writeStateMachineData failed: blockId" +
+        LOG.error(getGroupId() + ": writeChunk writeStateMachineData failed: 
blockId" +
             write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
             write.getChunkData().getChunkName() + " Error message: " +
             r.getMessage() + " Container Result: " + r.getResult());
@@ -601,7 +599,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
         metrics.incNumBytesWrittenCount(
             requestProto.getWriteChunk().getChunkData().getLen());
         if (LOG.isDebugEnabled()) {
-          LOG.debug(gid +
+          LOG.debug(getGroupId() +
               ": writeChunk writeStateMachineData  completed: blockId" +
               write.getBlockID() + " logIndex " + entryIndex + " chunkName " +
               write.getChunkData().getChunkName());
@@ -622,7 +620,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           DispatcherContext context) throws StorageContainerException {
     if (LOG.isDebugEnabled()) {
       LOG.debug("{}: getStreamDataChannel {} containerID={} pipelineID={} " +
-                      "traceID={}", gid, requestProto.getCmdType(),
+                      "traceID={}", getGroupId(), requestProto.getCmdType(),
               requestProto.getContainerID(), requestProto.getPipelineID(),
               requestProto.getTraceID());
     }
@@ -781,7 +779,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           new StorageContainerException(response.getMessage(),
               response.getResult());
       LOG.error("gid {} : ReadStateMachine failed. cmd {} logIndex {} msg : "
-              + "{} Container Result: {}", gid, response.getCmdType(), index,
+              + "{} Container Result: {}", getGroupId(), 
response.getCmdType(), index,
           response.getMessage(), response.getResult());
       stateMachineHealthy.set(false);
       throw sce;
@@ -856,7 +854,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           .map(TransactionContext::getStateMachineContext)
           .orElse(null);
       final ContainerCommandRequestProto requestProto = context != null ? 
context.getLogProto()
-          : getContainerCommandRequestProto(gid, 
entry.getStateMachineLogEntry().getLogData());
+          : getContainerCommandRequestProto(getGroupId(), 
entry.getStateMachineLogEntry().getLogData());
 
       if (requestProto.getCmdType() != Type.WriteChunk) {
         throw new IllegalStateException("Cmd type:" + requestProto.getCmdType()
@@ -874,7 +872,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
       return future;
     } catch (Exception e) {
       metrics.incNumReadStateMachineFails();
-      LOG.error("{} unable to read stateMachineData:", gid, e);
+      LOG.error("{} unable to read stateMachineData:", getGroupId(), e);
       return completeExceptionally(e);
     }
   }
@@ -920,7 +918,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
     // from `HddsDatanodeService.stop()`, otherwise, it indicates this `close` 
originates from ratis.
     if (allServer) {
       if (datanodeService != null && !datanodeService.isStopped()) {
-        LOG.info("{} is closed by ratis", gid);
+        LOG.info("{} is closed by ratis", getGroupId());
         if (semaphore.tryAcquire()) {
           // run with a different thread, so this raft group can be closed
           Runnable runnable = () -> {
@@ -952,7 +950,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
           CompletableFuture.runAsync(runnable);
         }
       } else {
-        LOG.info("{} is closed by HddsDatanodeService", gid);
+        LOG.info("{} is closed by HddsDatanodeService", getGroupId());
       }
     }
   }
@@ -983,7 +981,7 @@ public class ContainerStateMachine extends BaseStateMachine 
{
   private void removeStateMachineDataIfNeeded(long index) {
     if (waitOnBothFollowers) {
       try {
-        RaftServer.Division division = 
ratisServer.getServer().getDivision(gid);
+        RaftServer.Division division = 
ratisServer.getServer().getDivision(getGroupId());
         if (division.getInfo().isLeader()) {
           long minIndex = Arrays.stream(division.getInfo()
               .getFollowerNextIndices()).min().getAsLong();
@@ -1041,7 +1039,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       CompletableFuture<Message> applyTransactionFuture =
           new CompletableFuture<>();
       final Consumer<Throwable> exceptionHandler = e -> {
-        LOG.error(gid + ": failed to applyTransaction at logIndex " + index
+        LOG.error(getGroupId() + ": failed to applyTransaction at logIndex " + 
index
             + " for " + requestProto.getCmdType(), e);
         stateMachineHealthy.compareAndSet(true, false);
         metrics.incNumApplyTransactionsFails();
@@ -1069,7 +1067,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
               new StorageContainerException(r.getMessage(), r.getResult());
           LOG.error(
               "gid {} : ApplyTransaction failed. cmd {} logIndex {} msg : "
-                  + "{} Container Result: {}", gid, r.getCmdType(), index,
+                  + "{} Container Result: {}", getGroupId(), r.getCmdType(), 
index,
               r.getMessage(), r.getResult());
           metrics.incNumApplyTransactionsFails();
           // Since the applyTransaction now is completed exceptionally,
@@ -1078,12 +1076,12 @@ public class ContainerStateMachine extends 
BaseStateMachine {
           // shutdown.
           applyTransactionFuture.completeExceptionally(sce);
           stateMachineHealthy.compareAndSet(true, false);
-          ratisServer.handleApplyTransactionFailure(gid, trx.getServerRole());
+          ratisServer.handleApplyTransactionFailure(getGroupId(), 
trx.getServerRole());
         } else {
           if (LOG.isDebugEnabled()) {
             LOG.debug(
                 "gid {} : ApplyTransaction completed. cmd {} logIndex {} msg : 
"
-                    + "{} Container Result: {}", gid, r.getCmdType(), index,
+                    + "{} Container Result: {}", getGroupId(), r.getCmdType(), 
index,
                 r.getMessage(), r.getResult());
           }
           if (cmdType == Type.WriteChunk || cmdType == Type.PutSmallFile) {
@@ -1161,25 +1159,25 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   @Override
   public void notifyFollowerSlowness(RoleInfoProto roleInfoProto, RaftPeer 
follower) {
-    ratisServer.handleFollowerSlowness(gid, roleInfoProto, follower);
+    ratisServer.handleFollowerSlowness(getGroupId(), roleInfoProto, follower);
   }
 
   @Override
   public void notifyExtendedNoLeader(RoleInfoProto roleInfoProto) {
-    ratisServer.handleNoLeader(gid, roleInfoProto);
+    ratisServer.handleNoLeader(getGroupId(), roleInfoProto);
   }
 
   @Override
   public void notifyLogFailed(Throwable t, LogEntryProto failedEntry) {
-    LOG.error("{}: {} {}", gid, TermIndex.valueOf(failedEntry),
+    LOG.error("{}: {} {}", getGroupId(), TermIndex.valueOf(failedEntry),
         toStateMachineLogEntryString(failedEntry.getStateMachineLogEntry()), 
t);
-    ratisServer.handleNodeLogFailure(gid, t);
+    ratisServer.handleNodeLogFailure(getGroupId(), t);
   }
 
   @Override
   public CompletableFuture<TermIndex> notifyInstallSnapshotFromLeader(
       RoleInfoProto roleInfoProto, TermIndex firstTermIndexInLog) {
-    ratisServer.handleInstallSnapshotFromLeader(gid, roleInfoProto,
+    ratisServer.handleInstallSnapshotFromLeader(getGroupId(), roleInfoProto,
         firstTermIndexInLog);
     final CompletableFuture<TermIndex> future = new CompletableFuture<>();
     future.complete(firstTermIndexInLog);
@@ -1188,7 +1186,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   @Override
   public void notifyGroupRemove() {
-    ratisServer.notifyGroupRemove(gid);
+    ratisServer.notifyGroupRemove(getGroupId());
     // Make best effort to quasi-close all the containers on group removal.
     // Containers already in terminal state like CLOSED or UNHEALTHY will not
     // be affected.
@@ -1196,7 +1194,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
       try {
         containerController.markContainerForClose(cid);
         containerController.quasiCloseContainer(cid,
-            "Ratis group removed. Group id: " + gid);
+            "Ratis group removed. Group id: " + getGroupId());
       } catch (IOException e) {
         LOG.debug("Failed to quasi-close container {}", cid);
       }
@@ -1218,7 +1216,7 @@ public class ContainerStateMachine extends 
BaseStateMachine {
 
   @Override
   public String toStateMachineLogEntryString(StateMachineLogEntryProto proto) {
-    return smProtoToString(gid, containerController, proto);
+    return smProtoToString(getGroupId(), containerController, proto);
   }
 
   public static String smProtoToString(RaftGroupId gid,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to