This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new d6f63bf2e5 HDDS-7619. Update SCM term in datanode when command is
received (#4072)
d6f63bf2e5 is described below
commit d6f63bf2e53ac8a4220ee829d8bbb2427dd7d3fa
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Tue Dec 13 14:27:07 2022 +0100
HDDS-7619. Update SCM term in datanode when command is received (#4072)
---
.../common/statemachine/StateContext.java | 34 ++++++---
.../common/statemachine/TestStateContext.java | 85 +++++++++++++++-------
2 files changed, 83 insertions(+), 36 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index eb44b3b8b6..3ed7ee4ae6 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -24,7 +24,7 @@ import java.util.HashSet;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
-import java.util.Optional;
+import java.util.OptionalLong;
import java.util.Queue;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
@@ -142,7 +142,7 @@ public class StateContext {
*
* For non-HA mode, term of SCMCommand will be 0.
*/
- private Optional<Long> termOfLeaderSCM = Optional.empty();
+ private OptionalLong termOfLeaderSCM = OptionalLong.empty();
/**
* Starting with a 2 sec heartbeat frequency which will be updated to the
@@ -720,10 +720,9 @@ public class StateContext {
// if commandQueue is not empty, init termOfLeaderSCM
// with the largest term found in commandQueue
- commandQueue.stream()
+ termOfLeaderSCM = commandQueue.stream()
.mapToLong(SCMCommand::getTerm)
- .max()
- .ifPresent(term -> termOfLeaderSCM = Optional.of(term));
+ .max();
}
/**
@@ -732,11 +731,24 @@ public class StateContext {
*/
private void updateTermOfLeaderSCM(SCMCommand<?> command) {
if (!termOfLeaderSCM.isPresent()) {
- LOG.error("should init termOfLeaderSCM before update it.");
return;
}
- termOfLeaderSCM = Optional.of(
- Long.max(termOfLeaderSCM.get(), command.getTerm()));
+
+ final long currentTerm = termOfLeaderSCM.getAsLong();
+ final long newTerm = command.getTerm();
+ if (currentTerm < newTerm) {
+ setTermOfLeaderSCM(newTerm);
+ }
+ }
+
+ @VisibleForTesting
+ void setTermOfLeaderSCM(long term) {
+ termOfLeaderSCM = OptionalLong.of(term);
+ }
+
+ @VisibleForTesting
+ OptionalLong getTermOfLeaderSCM() {
+ return termOfLeaderSCM;
}
/**
@@ -759,13 +771,14 @@ public class StateContext {
}
updateTermOfLeaderSCM(command);
- if (command.getTerm() == termOfLeaderSCM.get()) {
+ final long currentTerm = termOfLeaderSCM.getAsLong();
+ if (command.getTerm() == currentTerm) {
return command;
}
LOG.warn("Detect and drop a SCMCommand {} from stale leader SCM," +
" stale term {}, latest term {}.",
- command, command.getTerm(), termOfLeaderSCM.get());
+ command, command.getTerm(), currentTerm);
}
} finally {
lock.unlock();
@@ -780,6 +793,7 @@ public class StateContext {
public void addCommand(SCMCommand command) {
lock.lock();
try {
+ updateTermOfLeaderSCM(command);
commandQueue.add(command);
} finally {
lock.unlock();
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
index 5c5ee4e705..331d9f2bd9 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/TestStateContext.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.ozone.container.common.statemachine;
import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
+import static java.util.Collections.emptyList;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ClosePipelineInfo;
import static
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerAction.Action.CLOSE;
import static org.apache.ozone.test.GenericTestUtils.waitFor;
@@ -37,6 +38,7 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import java.util.OptionalLong;
import java.util.concurrent.Callable;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
@@ -62,6 +64,7 @@ import
org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ClosePipelineCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.ozone.test.LambdaTestUtils;
import org.junit.jupiter.api.Assertions;
import org.junit.jupiter.api.Test;
@@ -137,19 +140,7 @@ public class TestStateContext {
@Test
public void testReportQueueWithAddReports() throws IOException {
- OzoneConfiguration conf = new OzoneConfiguration();
- DatanodeStateMachine datanodeStateMachineMock =
- mock(DatanodeStateMachine.class);
- OzoneContainer o = mock(OzoneContainer.class);
- ContainerSet s = mock(ContainerSet.class);
- when(datanodeStateMachineMock.getContainer()).thenReturn(o);
- when(o.getContainerSet()).thenReturn(s);
- when(s.getContainerReport())
- .thenReturn(
- StorageContainerDatanodeProtocolProtos
- .ContainerReportsProto.getDefaultInstance());
- StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ StateContext ctx = createSubject();
InetSocketAddress scm1 = new InetSocketAddress("scm1", 9001);
ctx.addEndpoint(scm1);
InetSocketAddress scm2 = new InetSocketAddress("scm2", 9001);
@@ -623,19 +614,7 @@ public class TestStateContext {
@Test
public void testCommandQueueSummary() throws IOException {
- OzoneConfiguration conf = new OzoneConfiguration();
- DatanodeStateMachine datanodeStateMachineMock =
- mock(DatanodeStateMachine.class);
- OzoneContainer o = mock(OzoneContainer.class);
- ContainerSet s = mock(ContainerSet.class);
- when(datanodeStateMachineMock.getContainer()).thenReturn(o);
- when(o.getContainerSet()).thenReturn(s);
- when(s.getContainerReport())
- .thenReturn(
- StorageContainerDatanodeProtocolProtos
- .ContainerReportsProto.getDefaultInstance());
- StateContext ctx = new StateContext(conf, DatanodeStates.getInitState(),
- datanodeStateMachineMock);
+ StateContext ctx = createSubject();
ctx.addCommand(new ReplicateContainerCommand(1, null));
ctx.addCommand(new ClosePipelineCommand(PipelineID.randomId()));
ctx.addCommand(new ReplicateContainerCommand(2, null));
@@ -652,4 +631,58 @@ public class TestStateContext {
summary.get(SCMCommandProto.Type.closeContainerCommand).intValue());
}
+ @Test
+ void updatesTermForCommandWithNewerTerm() throws IOException {
+ final long originalTerm = 1;
+ final long commandTerm = 2;
+ StateContext subject = createSubject();
+ SCMCommand<?> commandWithNewTerm = someCommand();
+ subject.setTermOfLeaderSCM(originalTerm);
+ commandWithNewTerm.setTerm(commandTerm);
+
+ subject.addCommand(commandWithNewTerm);
+
+ OptionalLong termOfLeaderSCM = subject.getTermOfLeaderSCM();
+ assertTrue(termOfLeaderSCM.isPresent());
+ assertEquals(commandTerm, termOfLeaderSCM.getAsLong());
+ assertEquals(commandWithNewTerm, subject.getNextCommand());
+ }
+
+ @Test
+ void keepsExistingTermForCommandWithOlderTerm() throws IOException {
+ final long originalTerm = 2;
+ final long commandTerm = 1;
+ StateContext subject = createSubject();
+ SCMCommand<?> commandWithNewTerm = someCommand();
+ subject.setTermOfLeaderSCM(originalTerm);
+ commandWithNewTerm.setTerm(commandTerm);
+
+ subject.addCommand(commandWithNewTerm);
+
+ OptionalLong termOfLeaderSCM = subject.getTermOfLeaderSCM();
+ assertTrue(termOfLeaderSCM.isPresent());
+ assertEquals(originalTerm, termOfLeaderSCM.getAsLong());
+ assertNull(subject.getNextCommand());
+ }
+
+ private static StateContext createSubject() throws IOException {
+ OzoneConfiguration conf = new OzoneConfiguration();
+ DatanodeStateMachine datanodeStateMachineMock =
+ mock(DatanodeStateMachine.class);
+ OzoneContainer o = mock(OzoneContainer.class);
+ ContainerSet s = mock(ContainerSet.class);
+ when(datanodeStateMachineMock.getContainer()).thenReturn(o);
+ when(o.getContainerSet()).thenReturn(s);
+ when(s.getContainerReport())
+ .thenReturn(
+ StorageContainerDatanodeProtocolProtos
+ .ContainerReportsProto.getDefaultInstance());
+ return new StateContext(conf, DatanodeStates.getInitState(),
+ datanodeStateMachineMock);
+ }
+
+ private static SCMCommand<?> someCommand() {
+ return new ReplicateContainerCommand(1, emptyList());
+ }
+
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]