This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git


The following commit(s) were added to refs/heads/master by this push:
     new d6f63bf2e5 HDDS-7619. Update SCM term in datanode when command is 
received (#4072)
d6f63bf2e5 is described below

commit d6f63bf2e53ac8a4220ee829d8bbb2427dd7d3fa
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Dec 13 14:27:07 2022 +0100

    HDDS-7619. Update SCM term in datanode when command is received (#4072)
---
 .../common/statemachine/StateContext.java          | 34 ++++++---
 .../common/statemachine/TestStateContext.java      | 85 +++++++++++++++-------
 2 files changed, 83 insertions(+), 36 deletions(-)

diff --git 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index eb44b3b8b6..3ed7ee4ae6 100644
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
-import java.util.Optional;
+import java.util.OptionalLong;
 import java.util.Queue;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
@@ -142,7 +142,7 @@ public class StateContext {
    *
    * For non-HA mode, term of SCMCommand will be 0.
    */
-  private Optional<Long> termOfLeaderSCM = Optional.empty();
+  private OptionalLong termOfLeaderSCM = OptionalLong.empty();
 
   /**
    * Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -720,10 +720,9 @@ public class StateContext {
 
     // if commandQueue is not empty, init termOfLeaderSCM
     // with the largest term found in commandQueue
-    commandQueue.stream()
+    termOfLeaderSCM = commandQueue.stream()
         .mapToLong(SCMCommand::getTerm)
-        .max()
-        .ifPresent(term -> termOfLeaderSCM = Optional.of(term));
+        .max();
   }
 
   /**
@@ -732,11 +731,24 @@ public class StateContext {
    */
   private void updateTermOfLeaderSCM(SCMCommand<?> command) {
     if (!termOfLeaderSCM.isPresent()) {
-      LOG.error("should init termOfLeaderSCM before update it.");
       return;
     }
-    termOfLeaderSCM = Optional.of(
-        Long.max(termOfLeaderSCM.get(), command.getTerm()));
+
+    final long currentTerm = termOfLeaderSCM.getAsLong();
+    final long newTerm = command.getTerm();
+    if (currentTerm < newTerm) {
+      setTermOfLeaderSCM(newTerm);
+    }
+  }
+
+  @VisibleForTesting
+  void setTermOfLeaderSCM(long term) {
+    termOfLeaderSCM = OptionalLong.of(term);
+  }
+
+  @VisibleForTesting
+  OptionalLong getTermOfLeaderSCM() {
+    return termOfLeaderSCM;
   }
 
   /**
@@ -759,13 +771,14 @@ public class StateContext {
         }
 
         updateTermOfLeaderSCM(command);
-        if (command.getTerm() == termOfLeaderSCM.get()) {
+        final long currentTerm = termOfLeaderSCM.getAsLong();
+        if (command.getTerm() == currentTerm) {
           return command;
         }
 
         LOG.warn("Detect and drop a SCMCommand {} from stale leader SCM," +
             " stale term {}, latest term {}.",
-            command, command.getTerm(), termOfLeaderSCM.get());
+            command, command.getTerm(), currentTerm);
       }
     } finally {
       lock.unlock();
@@ -780,6 +793,7 @@ public class StateContext {
   public void addCommand(SCMCommand command) {
     lock.lock();
     try {
+      updateTermOfLeaderSCM(command);
       commandQueue.add(command);
     } finally {
       lock.unlock();
diff --git 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 5c5ee4e705..331d9f2bd9 100644
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,6 +19,7 @@
 package org.apache.hadoop.ozone.container.common.statemachine;
 
 import static 
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static java.util.Collections.emptyList;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
 import static 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
 import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -37,6 +38,7 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.OptionalLong;
 import java.util.concurrent.Callable;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.Executors;
@@ -62,6 +64,7 @@ import 
org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
 import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
 import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
 import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
 import org.apache.ozone.test.LambdaTestUtils;
 import org.junit.jupiter.api.Assertions;
 import org.junit.jupiter.api.Test;
@@ -137,19 +140,7 @@ public class TestStateContext {
 
   @Test
   public void testReportQueueWithAddReports() throws IOException {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachineMock =
-        mock(DatanodeStateMachine.class);
-    OzoneContainer o = mock(OzoneContainer.class);
-    ContainerSet s = mock(ContainerSet.class);
-    when(datanodeStateMachineMock.getContainer()).thenReturn(o);
-    when(o.getContainerSet()).thenReturn(s);
-    when(s.getContainerReport())
-        .thenReturn(
-            StorageContainerDatanodeProtocolProtos
-                .ContainerReportsProto.getDefaultInstance());
-    StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
-        datanodeStateMachineMock);
+    StateContext ctx = createSubject();
     InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
     ctx.addEndpoint(scm1);
     InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -623,19 +614,7 @@ public class TestStateContext {
 
   @Test
   public void testCommandQueueSummary() throws IOException {
-    OzoneConfiguration conf = new OzoneConfiguration();
-    DatanodeStateMachine datanodeStateMachineMock =
-        mock(DatanodeStateMachine.class);
-    OzoneContainer o = mock(OzoneContainer.class);
-    ContainerSet s = mock(ContainerSet.class);
-    when(datanodeStateMachineMock.getContainer()).thenReturn(o);
-    when(o.getContainerSet()).thenReturn(s);
-    when(s.getContainerReport())
-        .thenReturn(
-            StorageContainerDatanodeProtocolProtos
-                .ContainerReportsProto.getDefaultInstance());
-    StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
-        datanodeStateMachineMock);
+    StateContext ctx = createSubject();
     ctx.addCommand(new ReplicateContainerCommand(1, null));
     ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
     ctx.addCommand(new ReplicateContainerCommand(2, null));
@@ -652,4 +631,58 @@ public class TestStateContext {
         summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
   }
 
+  @Test
+  void updatesTermForCommandWithNewerTerm() throws IOException {
+    final long originalTerm = 1;
+    final long commandTerm = 2;
+    StateContext subject = createSubject();
+    SCMCommand<?> commandWithNewTerm = someCommand();
+    subject.setTermOfLeaderSCM(originalTerm);
+    commandWithNewTerm.setTerm(commandTerm);
+
+    subject.addCommand(commandWithNewTerm);
+
+    OptionalLong termOfLeaderSCM = subject.getTermOfLeaderSCM();
+    assertTrue(termOfLeaderSCM.isPresent());
+    assertEquals(commandTerm, termOfLeaderSCM.getAsLong());
+    assertEquals(commandWithNewTerm, subject.getNextCommand());
+  }
+
+  @Test
+  void keepsExistingTermForCommandWithOlderTerm() throws IOException {
+    final long originalTerm = 2;
+    final long commandTerm = 1;
+    StateContext subject = createSubject();
+    SCMCommand<?> commandWithNewTerm = someCommand();
+    subject.setTermOfLeaderSCM(originalTerm);
+    commandWithNewTerm.setTerm(commandTerm);
+
+    subject.addCommand(commandWithNewTerm);
+
+    OptionalLong termOfLeaderSCM = subject.getTermOfLeaderSCM();
+    assertTrue(termOfLeaderSCM.isPresent());
+    assertEquals(originalTerm, termOfLeaderSCM.getAsLong());
+    assertNull(subject.getNextCommand());
+  }
+
+  private static StateContext createSubject() throws IOException {
+    OzoneConfiguration conf = new OzoneConfiguration();
+    DatanodeStateMachine datanodeStateMachineMock =
+        mock(DatanodeStateMachine.class);
+    OzoneContainer o = mock(OzoneContainer.class);
+    ContainerSet s = mock(ContainerSet.class);
+    when(datanodeStateMachineMock.getContainer()).thenReturn(o);
+    when(o.getContainerSet()).thenReturn(s);
+    when(s.getContainerReport())
+        .thenReturn(
+            StorageContainerDatanodeProtocolProtos
+                .ContainerReportsProto.getDefaultInstance());
+    return new StateContext(conf, DatanodeStates.getInitState(),
+        datanodeStateMachineMock);
+  }
+
+  private static SCMCommand<?> someCommand() {
+    return new ReplicateContainerCommand(1, emptyList());
+  }
+
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to