This is an automated email from the ASF dual-hosted git repository.
sodonnell pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new fefc405577 HDDS-9122. Make the Datanode CloseContainerCommandHandler
async by queuing commands in an executor (#5277)
fefc405577 is described below
commit fefc405577c63896e347724c5ab0db70370061ea
Author: Sumit Agrawal <[email protected]>
AuthorDate: Mon Sep 18 15:52:19 2023 +0530
HDDS-9122. Make the Datanode CloseContainerCommandHandler async by queuing
commands in an executor (#5277)
---
.../common/statemachine/DatanodeConfiguration.java | 31 +++++
.../common/statemachine/DatanodeStateMachine.java | 4 +-
.../CloseContainerCommandHandler.java | 155 ++++++++++++---------
.../commandhandler/CommandDispatcher.java | 6 +-
.../TestCloseContainerCommandHandler.java | 23 ++-
5 files changed, 146 insertions(+), 73 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
index 1127fa69d0..bbcbcf12fe 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeConfiguration.java
@@ -40,6 +40,8 @@ public class DatanodeConfiguration {
static final String CONTAINER_DELETE_THREADS_MAX_KEY =
"hdds.datanode.container.delete.threads.max";
+ static final String CONTAINER_CLOSE_THREADS_MAX_KEY =
+ "hdds.datanode.container.close.threads.max";
static final String PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY =
"hdds.datanode.periodic.disk.check.interval.minutes";
public static final String DISK_CHECK_FILE_SIZE_KEY =
@@ -115,6 +117,7 @@ public class DatanodeConfiguration {
private int numReadThreadPerVolume = 10;
static final int CONTAINER_DELETE_THREADS_DEFAULT = 2;
+ static final int CONTAINER_CLOSE_THREADS_DEFAULT = 3;
static final int BLOCK_DELETE_THREADS_DEFAULT = 5;
/**
@@ -130,6 +133,19 @@ public class DatanodeConfiguration {
)
private int containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT;
+ /**
+ * The maximum number of threads used to close containers on a datanode
+ * simultaneously.
+ */
+ @Config(key = "container.close.threads.max",
+ type = ConfigType.INT,
+ defaultValue = "3",
+ tags = {DATANODE},
+ description = "The maximum number of threads used to close containers " +
+ "on a datanode"
+ )
+ private int containerCloseThreads = CONTAINER_CLOSE_THREADS_DEFAULT;
+
/**
* The maximum number of threads used to handle delete block commands.
* It takes about 200ms to open a RocksDB with HDD media, so basically DN
@@ -503,6 +519,13 @@ public class DatanodeConfiguration {
containerDeleteThreads = CONTAINER_DELETE_THREADS_DEFAULT;
}
+ if (containerCloseThreads < 1) {
+ LOG.warn(CONTAINER_CLOSE_THREADS_MAX_KEY + " must be greater than zero" +
+ " and was set to {}. Defaulting to {}",
+ containerCloseThreads, CONTAINER_CLOSE_THREADS_DEFAULT);
+ containerCloseThreads = CONTAINER_CLOSE_THREADS_DEFAULT;
+ }
+
if (periodicDiskCheckIntervalMinutes < 1) {
LOG.warn(PERIODIC_DISK_CHECK_INTERVAL_MINUTES_KEY +
" must be greater than zero and was set to {}. Defaulting to {}",
@@ -621,6 +644,14 @@ public class DatanodeConfiguration {
return containerDeleteThreads;
}
+ public void setContainerCloseThreads(int containerCloseThreads) {
+ this.containerCloseThreads = containerCloseThreads;
+ }
+
+ public int getContainerCloseThreads() {
+ return containerCloseThreads;
+ }
+
public long getPeriodicDiskCheckIntervalMinutes() {
return periodicDiskCheckIntervalMinutes;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index a3724e0c47..5c18e86bd3 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -225,7 +225,9 @@ public class DatanodeStateMachine implements Closeable {
// When we add new handlers just adding a new handler here should do the
// trick.
commandDispatcher = CommandDispatcher.newBuilder()
- .addHandler(new CloseContainerCommandHandler())
+ .addHandler(new CloseContainerCommandHandler(
+ dnConf.getContainerCloseThreads(),
+ dnConf.getCommandQueueLimit()))
.addHandler(new DeleteBlocksCommandHandler(getContainer(),
conf, dnConf.getBlockDeleteThreads(),
dnConf.getBlockDeleteQueueLimit()))
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index e7a0b2e9f8..b34082550f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -16,6 +16,13 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -49,13 +56,22 @@ public class CloseContainerCommandHandler implements
CommandHandler {
private static final Logger LOG =
LoggerFactory.getLogger(CloseContainerCommandHandler.class);
- private AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicLong invocationCount = new AtomicLong(0);
+ private final AtomicInteger queuedCount = new AtomicInteger(0);
+ private final ExecutorService executor;
private long totalTime;
/**
* Constructs a ContainerReport handler.
*/
- public CloseContainerCommandHandler() {
+ public CloseContainerCommandHandler(
+ int threadPoolSize, int queueSize) {
+ executor = new ThreadPoolExecutor(
+ threadPoolSize, threadPoolSize,
+ 0L, TimeUnit.MILLISECONDS,
+ new LinkedBlockingQueue<>(queueSize),
+ new ThreadFactoryBuilder()
+ .setNameFormat("CloseContainerThread-%d").build());
}
/**
@@ -69,73 +85,78 @@ public class CloseContainerCommandHandler implements
CommandHandler {
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
- invocationCount.incrementAndGet();
- final long startTime = Time.monotonicNow();
- final DatanodeDetails datanodeDetails = context.getParent()
- .getDatanodeDetails();
- final CloseContainerCommandProto closeCommand =
- ((CloseContainerCommand)command).getProto();
- final ContainerController controller = ozoneContainer.getController();
- final long containerId = closeCommand.getContainerID();
- LOG.debug("Processing Close Container command container #{}",
- containerId);
- try {
- final Container container = controller.getContainer(containerId);
-
- if (container == null) {
- LOG.error("Container #{} does not exist in datanode. "
- + "Container close failed.", containerId);
- return;
- }
-
- // move the container to CLOSING if in OPEN state
- controller.markContainerForClose(containerId);
-
- switch (container.getContainerState()) {
- case OPEN:
- case CLOSING:
- // If the container is part of open pipeline, close it via write
channel
- if (ozoneContainer.getWriteChannel()
- .isExist(closeCommand.getPipelineID())) {
- ContainerCommandRequestProto request =
- getContainerCommandRequestProto(datanodeDetails,
- closeCommand.getContainerID(),
- command.getEncodedToken());
- ozoneContainer.getWriteChannel()
- .submitRequest(request, closeCommand.getPipelineID());
- } else if (closeCommand.getForce()) {
- // Non-RATIS containers should have the force close flag set, so they
- // are moved to CLOSED immediately rather than going to quasi-closed.
- controller.closeContainer(containerId);
- } else {
- controller.quasiCloseContainer(containerId,
- "Ratis pipeline does not exist");
- LOG.info("Marking Container {} quasi closed", containerId);
+ queuedCount.incrementAndGet();
+ CompletableFuture.runAsync(() -> {
+ invocationCount.incrementAndGet();
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails datanodeDetails = context.getParent()
+ .getDatanodeDetails();
+ final CloseContainerCommandProto closeCommand =
+ ((CloseContainerCommand) command).getProto();
+ final ContainerController controller = ozoneContainer.getController();
+ final long containerId = closeCommand.getContainerID();
+ LOG.debug("Processing Close Container command container #{}",
+ containerId);
+ try {
+ final Container container = controller.getContainer(containerId);
+
+ if (container == null) {
+ LOG.error("Container #{} does not exist in datanode. "
+ + "Container close failed.", containerId);
+ return;
}
- break;
- case QUASI_CLOSED:
- if (closeCommand.getForce()) {
- controller.closeContainer(containerId);
+
+ // move the container to CLOSING if in OPEN state
+ controller.markContainerForClose(containerId);
+
+ switch (container.getContainerState()) {
+ case OPEN:
+ case CLOSING:
+ // If the container is part of open pipeline, close it via
+ // write channel
+ if (ozoneContainer.getWriteChannel()
+ .isExist(closeCommand.getPipelineID())) {
+ ContainerCommandRequestProto request =
+ getContainerCommandRequestProto(datanodeDetails,
+ closeCommand.getContainerID(),
+ command.getEncodedToken());
+ ozoneContainer.getWriteChannel()
+ .submitRequest(request, closeCommand.getPipelineID());
+ } else if (closeCommand.getForce()) {
+ // Non-RATIS containers should have the force close flag set, so
+ // they are moved to CLOSED immediately rather than going to
+ // quasi-closed
+ controller.closeContainer(containerId);
+ } else {
+ controller.quasiCloseContainer(containerId,
+ "Ratis pipeline does not exist");
+ LOG.info("Marking Container {} quasi closed", containerId);
+ }
+ break;
+ case QUASI_CLOSED:
+ if (closeCommand.getForce()) {
+ controller.closeContainer(containerId);
+ }
+ break;
+ case CLOSED:
+ break;
+ case UNHEALTHY:
+ case INVALID:
+ LOG.debug("Cannot close the container #{}, the container is"
+ + " in {} state.", containerId, container.getContainerState());
+ break;
+ default:
+ break;
}
- break;
- case CLOSED:
- break;
- case UNHEALTHY:
- case INVALID:
- LOG.debug("Cannot close the container #{}, the container is"
- + " in {} state.", containerId, container.getContainerState());
- break;
- default:
- break;
+ } catch (NotLeaderException e) {
+ LOG.debug("Follower cannot close container #{}.", containerId);
+ } catch (IOException e) {
+ LOG.error("Can't close container #{}", containerId, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
}
- } catch (NotLeaderException e) {
- LOG.debug("Follower cannot close container #{}.", containerId);
- } catch (IOException e) {
- LOG.error("Can't close container #{}", containerId, e);
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
- }
+ }, executor).whenComplete((v, e) -> queuedCount.decrementAndGet());
}
private ContainerCommandRequestProto getContainerCommandRequestProto(
@@ -195,6 +216,6 @@ public class CloseContainerCommandHandler implements
CommandHandler {
@Override
public int getQueuedCount() {
- return 0;
+ return queuedCount.get();
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index ebd862ca52..b6ec70b2ef 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -100,7 +100,11 @@ public final class CommandDispatcher {
CommandHandler handler = handlerMap.get(command.getType());
if (handler != null) {
commandHandlerMetrics.increaseCommandCount(command.getType());
- handler.handle(command, container, context, connectionManager);
+ try {
+ handler.handle(command, container, context, connectionManager);
+ } catch (Exception ex) {
+ LOG.error("Exception while handle command, ", ex);
+ }
} else {
LOG.error("Unknown SCM Command queued. There is no handler for this " +
"command. Command: {}", command.getType().getDescriptorForType()
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 4d3d435ed2..e6267550be 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -16,6 +16,7 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+import java.util.concurrent.TimeoutException;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
@@ -70,7 +71,7 @@ public class TestCloseContainerCommandHandler {
private ContainerController controller;
private ContainerSet containerSet;
private CloseContainerCommandHandler subject =
- new CloseContainerCommandHandler();
+ new CloseContainerCommandHandler(1, 1000);
private final ContainerLayoutVersion layout;
@@ -120,7 +121,7 @@ public class TestCloseContainerCommandHandler {
public void closeContainerWithPipeline() throws Exception {
// close a container that's associated with an existing pipeline
subject.handle(closeWithKnownPipeline(), ozoneContainer, context, null);
-
+ waitTillFinishExecution(subject);
verify(containerHandler)
.markContainerForClose(container);
verify(writeChannel)
@@ -130,9 +131,10 @@ public class TestCloseContainerCommandHandler {
}
@Test
- public void closeContainerWithoutPipeline() throws IOException {
+ public void closeContainerWithoutPipeline() throws Exception {
// close a container that's NOT associated with an open pipeline
subject.handle(closeWithUnknownPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(containerHandler)
.markContainerForClose(container);
@@ -145,9 +147,10 @@ public class TestCloseContainerCommandHandler {
}
@Test
- public void closeContainerWithForceFlagSet() throws IOException {
+ public void closeContainerWithForceFlagSet() throws Exception {
// close a container that's associated with an existing pipeline
subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(containerHandler)
.markContainerForClose(container);
@@ -162,6 +165,7 @@ public class TestCloseContainerCommandHandler {
.setState(ContainerProtos.ContainerDataProto.State.QUASI_CLOSED);
subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(writeChannel, never())
.submitRequest(any(), any());
@@ -173,6 +177,7 @@ public class TestCloseContainerCommandHandler {
public void forceCloseOpenContainer() throws Exception {
// force-close a container that's NOT associated with an open pipeline
subject.handle(forceCloseWithoutPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(writeChannel, never())
.submitRequest(any(), any());
@@ -186,6 +191,7 @@ public class TestCloseContainerCommandHandler {
public void forceCloseOpenContainerWithPipeline() throws Exception {
// force-close a container that's associated with an existing pipeline
subject.handle(forceCloseWithPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(containerHandler)
.markContainerForClose(container);
@@ -205,7 +211,9 @@ public class TestCloseContainerCommandHandler {
// Since the container is already closed, these commands should do nothing,
// neither should they fail
subject.handle(closeWithUnknownPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
subject.handle(closeWithKnownPipeline(), ozoneContainer, context, null);
+ waitTillFinishExecution(subject);
verify(containerHandler, never())
.markContainerForClose(container);
@@ -279,4 +287,11 @@ public class TestCloseContainerCommandHandler {
.addPort(restPort);
return builder.build();
}
+
+ private void waitTillFinishExecution(
+ CloseContainerCommandHandler closeHandler)
+ throws InterruptedException, TimeoutException {
+ GenericTestUtils.waitFor(()
+ -> closeHandler.getQueuedCount() <= 0, 10, 3000);
+ }
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]