This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 2feb7388aa HDDS-8554. Thread pool size needs to be decreased in
different order in ReplicationSupervisor (#4669)
2feb7388aa is described below
commit 2feb7388aa7bf24eb61f4a231301498378763d11
Author: Doroszlai, Attila <[email protected]>
AuthorDate: Sat May 6 09:14:50 2023 +0200
HDDS-8554. Thread pool size needs to be decreased in different order in
ReplicationSupervisor (#4669)
---
.../replication/ReplicationSupervisor.java | 9 ++++--
.../replication/TestReplicationSupervisor.java | 33 +++++++++++++++++++++-
2 files changed, 39 insertions(+), 3 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
index 1276dfcf23..63d5e2b0c2 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/replication/ReplicationSupervisor.java
@@ -161,8 +161,13 @@ public final class ReplicationSupervisor {
.build());
executor = tpe;
executorThreadUpdater = threadCount -> {
- tpe.setMaximumPoolSize(threadCount);
- tpe.setCorePoolSize(threadCount);
+ if (threadCount < tpe.getCorePoolSize()) {
+ tpe.setCorePoolSize(threadCount);
+ tpe.setMaximumPoolSize(threadCount);
+ } else {
+ tpe.setMaximumPoolSize(threadCount);
+ tpe.setCorePoolSize(threadCount);
+ }
};
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
index b3df82ada2..739a4f5965 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/replication/TestReplicationSupervisor.java
@@ -75,6 +75,9 @@ import javax.annotation.Nonnull;
import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static java.util.Collections.emptyList;
import static java.util.Collections.singletonList;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_MAINTENANCE;
+import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE;
import static
org.apache.hadoop.ozone.container.replication.AbstractReplicationTask.Status.DONE;
import static
org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand.fromSources;
import static org.junit.jupiter.api.Assertions.fail;
@@ -648,6 +651,34 @@ public class TestReplicationSupervisor {
}
}
+ @Test
+ public void poolSizeCanBeIncreased() {
+ datanode.setPersistedOpState(IN_SERVICE);
+ ReplicationSupervisor subject = ReplicationSupervisor.newBuilder()
+ .stateContext(context)
+ .build();
+
+ try {
+ subject.nodeStateUpdated(ENTERING_MAINTENANCE);
+ } finally {
+ subject.stop();
+ }
+ }
+
+ @Test
+ public void poolSizeCanBeDecreased() {
+ datanode.setPersistedOpState(IN_MAINTENANCE);
+ ReplicationSupervisor subject = ReplicationSupervisor.newBuilder()
+ .stateContext(context)
+ .build();
+
+ try {
+ subject.nodeStateUpdated(IN_SERVICE);
+ } finally {
+ subject.stop();
+ }
+ }
+
@Test
public void testMaxQueueSize() {
List<DatanodeDetails> datanodes = new ArrayList<>();
@@ -688,7 +719,7 @@ public class TestReplicationSupervisor {
Assert.assertEquals(2 * maxQueueSize, rs.getTotalInFlightReplications());
// queue size is restored
- rs.nodeStateUpdated(HddsProtos.NodeOperationalState.IN_SERVICE);
+ rs.nodeStateUpdated(IN_SERVICE);
Assert.assertEquals(maxQueueSize, rs.getMaxQueueSize());
Assert.assertEquals(replicationMaxStreams, threadPoolSize.get());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]