This is an automated email from the ASF dual-hosted git repository.
pifta pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new cc4e026d59 HDDS-11304. Make up for the missing functionality in
CommandDispatcher (#7062)
cc4e026d59 is described below
commit cc4e026d59e58273c412de15e0d4e6d5715848a3
Author: jianghuazhu <[email protected]>
AuthorDate: Thu Aug 29 23:48:56 2024 +0800
HDDS-11304. Make up for the missing functionality in CommandDispatcher
(#7062)
---
.../CloseContainerCommandHandler.java | 15 ++++--
.../commandhandler/CommandDispatcher.java | 17 +++----
.../commandhandler/DeleteBlocksCommandHandler.java | 4 +-
.../DeleteContainerCommandHandler.java | 17 +++++--
.../TestCloseContainerCommandHandler.java | 27 ++++++++++
.../TestDeleteContainerCommandHandler.java | 59 +++++++++++++++++++---
6 files changed, 113 insertions(+), 26 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 8533f7384d..bc703ac6a5 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -18,7 +18,6 @@ package
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
@@ -58,11 +57,11 @@ public class CloseContainerCommandHandler implements
CommandHandler {
private final AtomicLong invocationCount = new AtomicLong(0);
private final AtomicInteger queuedCount = new AtomicInteger(0);
- private final ExecutorService executor;
+ private final ThreadPoolExecutor executor;
private long totalTime;
/**
- * Constructs a ContainerReport handler.
+ * Constructs a close container command handler.
*/
public CloseContainerCommandHandler(
int threadPoolSize, int queueSize, String threadNamePrefix) {
@@ -220,4 +219,14 @@ public class CloseContainerCommandHandler implements
CommandHandler {
public int getQueuedCount() {
return queuedCount.get();
}
+
+ @Override
+ public int getThreadPoolMaxPoolSize() {
+ return executor.getMaximumPoolSize();
+ }
+
+ @Override
+ public int getThreadPoolActivePoolSize() {
+ return executor.getActiveCount();
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 9035b79c67..c3f8da74c7 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -56,11 +56,6 @@ public final class CommandDispatcher {
private CommandDispatcher(OzoneContainer container, SCMConnectionManager
connectionManager, StateContext context,
CommandHandler... handlers) {
- Preconditions.checkNotNull(context);
- Preconditions.checkNotNull(handlers);
- Preconditions.checkArgument(handlers.length > 0);
- Preconditions.checkNotNull(container);
- Preconditions.checkNotNull(connectionManager);
this.context = context;
this.container = container;
this.connectionManager = connectionManager;
@@ -77,6 +72,7 @@ public final class CommandDispatcher {
commandHandlerMetrics = CommandHandlerMetrics.create(handlerMap);
}
+ @VisibleForTesting
public CommandHandler getCloseContainerHandler() {
return handlerMap.get(Type.closeContainerCommand);
}
@@ -201,11 +197,12 @@ public final class CommandDispatcher {
* @return Command Dispatcher.
*/
public CommandDispatcher build() {
- Preconditions.checkNotNull(this.connectionManager, "Missing connection" +
- " manager.");
- Preconditions.checkNotNull(this.container, "Missing container.");
- Preconditions.checkNotNull(this.context, "Missing context.");
- Preconditions.checkArgument(this.handlerList.size() > 0);
+ Preconditions.checkNotNull(this.connectionManager,
+ "Missing scm connection manager.");
+ Preconditions.checkNotNull(this.container, "Missing ozone container.");
+ Preconditions.checkNotNull(this.context, "Missing state context.");
+ Preconditions.checkArgument(this.handlerList.size() > 0,
+ "The number of command handlers must be greater than 0.");
return new CommandDispatcher(this.container, this.connectionManager,
this.context, handlerList.toArray(
new CommandHandler[handlerList.size()]));
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 747749066e..bd7431c614 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -168,12 +168,12 @@ public class DeleteBlocksCommandHandler implements
CommandHandler {
@Override
public int getThreadPoolMaxPoolSize() {
- return ((ThreadPoolExecutor)executor).getMaximumPoolSize();
+ return executor.getMaximumPoolSize();
}
@Override
public int getThreadPoolActivePoolSize() {
- return ((ThreadPoolExecutor)executor).getActiveCount();
+ return executor.getActiveCount();
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index ead81c32e5..b76e306e1c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -36,7 +36,6 @@ import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.time.Clock;
import java.util.OptionalLong;
-import java.util.concurrent.ExecutorService;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
@@ -53,7 +52,7 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
private final AtomicInteger invocationCount = new AtomicInteger(0);
private final AtomicInteger timeoutCount = new AtomicInteger(0);
private final AtomicLong totalTime = new AtomicLong(0);
- private final ExecutorService executor;
+ private final ThreadPoolExecutor executor;
private final Clock clock;
private int maxQueueSize;
@@ -70,7 +69,7 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
}
protected DeleteContainerCommandHandler(Clock clock,
- ExecutorService executor, int queueSize) {
+ ThreadPoolExecutor executor, int queueSize) {
this.executor = executor;
this.clock = clock;
maxQueueSize = queueSize;
@@ -131,7 +130,7 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
@Override
public int getQueuedCount() {
- return ((ThreadPoolExecutor)executor).getQueue().size();
+ return executor.getQueue().size();
}
@Override
@@ -160,6 +159,16 @@ public class DeleteContainerCommandHandler implements
CommandHandler {
return totalTime.get();
}
+ @Override
+ public int getThreadPoolMaxPoolSize() {
+ return executor.getMaximumPoolSize();
+ }
+
+ @Override
+ public int getThreadPoolActivePoolSize() {
+ return executor.getActiveCount();
+ }
+
@Override
public void stop() {
try {
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 219645c8ed..a3b60aa36d 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -35,6 +35,7 @@ import
org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.ozone.test.GenericTestUtils;
+import org.junit.jupiter.api.Test;
import java.io.IOException;
import java.util.UUID;
@@ -43,6 +44,8 @@ import static java.util.Collections.singletonMap;
import static org.apache.hadoop.ozone.OzoneConsts.GB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertThrows;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.eq;
import static org.mockito.Mockito.mock;
@@ -292,4 +295,28 @@ public class TestCloseContainerCommandHandler {
GenericTestUtils.waitFor(()
-> closeHandler.getQueuedCount() <= 0, 10, 3000);
}
+
+ @Test
+ public void testThreadPoolPoolSize() {
+ assertEquals(1, subject.getThreadPoolMaxPoolSize());
+ assertEquals(0, subject.getThreadPoolActivePoolSize());
+
+ CloseContainerCommandHandler closeContainerCommandHandler =
+ new CloseContainerCommandHandler(10, 10, "");
+ closeContainerCommandHandler.handle(new CloseContainerCommand(
+ CONTAINER_ID + 1, PipelineID.randomId()),
+ ozoneContainer, context, null);
+ closeContainerCommandHandler.handle(new CloseContainerCommand(
+ CONTAINER_ID + 2, PipelineID.randomId()),
+ ozoneContainer, context, null);
+ closeContainerCommandHandler.handle(new CloseContainerCommand(
+ CONTAINER_ID + 3, PipelineID.randomId()),
+ ozoneContainer, context, null);
+ closeContainerCommandHandler.handle(new CloseContainerCommand(
+ CONTAINER_ID + 4, PipelineID.randomId()),
+ ozoneContainer, context, null);
+ assertEquals(10, closeContainerCommandHandler.getThreadPoolMaxPoolSize());
+ assertTrue(closeContainerCommandHandler.getThreadPoolActivePoolSize() > 0);
+ }
+
}
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
index 49c34828fb..5ee31b97fd 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerCommandHandler.java
@@ -19,6 +19,14 @@ package
org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -32,7 +40,6 @@ import java.time.Instant;
import java.time.ZoneId;
import java.util.OptionalLong;
-import static
com.google.common.util.concurrent.MoreExecutors.newDirectExecutorService;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.mock;
@@ -63,8 +70,14 @@ public class TestDeleteContainerCommandHandler {
}
@Test
- public void testExpiredCommandsAreNotProcessed() throws IOException {
- DeleteContainerCommandHandler handler = createSubject(clock, 1000);
+ public void testExpiredCommandsAreNotProcessed()
+ throws IOException, InterruptedException {
+ CountDownLatch latch1 = new CountDownLatch(1);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+ ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
+ threadFactory, latch1);
+ DeleteContainerCommandHandler handler = new DeleteContainerCommandHandler(
+ clock, executor, 100);
DeleteContainerCommand command1 = new DeleteContainerCommand(1L);
command1.setDeadline(clock.millis() + 10000);
@@ -75,9 +88,14 @@ public class TestDeleteContainerCommandHandler {
clock.fastForward(15000);
handler.handle(command1, ozoneContainer, null, null);
+ latch1.await();
assertEquals(1, handler.getTimeoutCount());
+ CountDownLatch latch2 = new CountDownLatch(2);
+ executor.setLatch(latch2);
handler.handle(command2, ozoneContainer, null, null);
handler.handle(command3, ozoneContainer, null, null);
+ latch2.await();
+
assertEquals(1, handler.getTimeoutCount());
assertEquals(3, handler.getInvocationCount());
verify(controller, times(0))
@@ -89,7 +107,8 @@ public class TestDeleteContainerCommandHandler {
}
@Test
- public void testCommandForCurrentTermIsExecuted() throws IOException {
+ public void testCommandForCurrentTermIsExecuted()
+ throws IOException, InterruptedException {
// GIVEN
DeleteContainerCommand command = new DeleteContainerCommand(1L);
command.setTerm(1);
@@ -97,10 +116,17 @@ public class TestDeleteContainerCommandHandler {
when(context.getTermOfLeaderSCM())
.thenReturn(OptionalLong.of(command.getTerm()));
- DeleteContainerCommandHandler subject = createSubject();
+ TestClock testClock = new TestClock(Instant.now(), ZoneId.systemDefault());
+ CountDownLatch latch = new CountDownLatch(1);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+ ThreadPoolWithLockExecutor executor = new ThreadPoolWithLockExecutor(
+ threadFactory, latch);
+ DeleteContainerCommandHandler subject = new DeleteContainerCommandHandler(
+ testClock, executor, 100);
// WHEN
subject.handle(command, ozoneContainer, context, null);
+ latch.await();
// THEN
verify(controller, times(1))
@@ -163,8 +189,10 @@ public class TestDeleteContainerCommandHandler {
private static DeleteContainerCommandHandler createSubject(
TestClock clock, int queueSize) {
- return new DeleteContainerCommandHandler(clock,
- newDirectExecutorService(), queueSize);
+ ThreadFactory threadFactory = new ThreadFactoryBuilder().build();
+ ThreadPoolExecutor executor = (ThreadPoolExecutor) Executors.
+ newFixedThreadPool(1, threadFactory);
+ return new DeleteContainerCommandHandler(clock, executor, queueSize);
}
private static DeleteContainerCommandHandler createSubjectWithPoolSize(
@@ -172,4 +200,21 @@ public class TestDeleteContainerCommandHandler {
return new DeleteContainerCommandHandler(1, clock, queueSize, "");
}
+ static class ThreadPoolWithLockExecutor extends ThreadPoolExecutor {
+ private CountDownLatch countDownLatch;
+ ThreadPoolWithLockExecutor(ThreadFactory threadFactory, CountDownLatch
latch) {
+ super(1, 1, 0, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<Runnable>(), threadFactory);
+ this.countDownLatch = latch;
+ }
+
+ void setLatch(CountDownLatch latch) {
+ this.countDownLatch = latch;
+ }
+
+ @Override
+ protected void afterExecute(Runnable r, Throwable t) {
+ countDownLatch.countDown();
+ }
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]