This is an automated email from the ASF dual-hosted git repository.
aengineer pushed a commit to branch trunk
in repository https://gitbox.apache.org/repos/asf/hadoop.git
The following commit(s) were added to refs/heads/trunk by this push:
new ddc0a40 HDDS-896. Handle over replicated containers in SCM.
Contributed by Nandakumar.
ddc0a40 is described below
commit ddc0a405074c1e850193ca1d83556f622aa04435
Author: Anu Engineer <[email protected]>
AuthorDate: Fri Jan 4 09:04:10 2019 -0800
HDDS-896. Handle over replicated containers in SCM.
Contributed by Nandakumar.
---
.../ozone/container/common/interfaces/Handler.java | 9 ++
.../common/statemachine/DatanodeStateMachine.java | 3 +
.../common/statemachine/StateContext.java | 34 +++--
.../CloseContainerCommandHandler.java | 94 ++++++------
.../DeleteContainerCommandHandler.java | 84 ++++++++++
.../states/endpoint/HeartbeatEndpointTask.java | 11 ++
.../ozone/container/keyvalue/KeyValueHandler.java | 62 ++++----
.../container/ozoneimpl/ContainerController.java | 12 +-
.../protocol/commands/CloseContainerCommand.java | 13 +-
.../protocol/commands/DeleteBlocksCommand.java | 6 +-
...terCommand.java => DeleteContainerCommand.java} | 57 ++++---
.../commands/ReplicateContainerCommand.java | 4 -
.../ozone/protocol/commands/ReregisterCommand.java | 11 +-
.../hadoop/ozone/protocol/commands/SCMCommand.java | 2 +-
.../TestCloseContainerCommandHandler.java | 18 ++-
.../scm/command/CommandStatusReportHandler.java | 12 ++
.../container/DeleteContainerCommandWatcher.java | 56 +++++++
.../hdds/scm/container/ReportHandlerHelper.java | 5 +-
.../container/replication/ReplicationManager.java | 170 ++++++++++++++++++---
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 14 ++
.../replication/TestReplicationManager.java | 95 ++++++++----
.../container/ozoneimpl/TestOzoneContainer.java | 5 +-
22 files changed, 564 insertions(+), 213 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
index 9f520d5..972eb83 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/interfaces/Handler.java
@@ -146,6 +146,15 @@ public abstract class Handler {
public abstract void closeContainer(Container container)
throws IOException;
+ /**
+ * Deletes the given container.
+ *
+ * @param container container to be deleted
+ * @throws IOException
+ */
+ public abstract void deleteContainer(Container container)
+ throws IOException;
+
public void setScmID(String scmId) {
this.scmID = scmId;
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
index ab860d6..7f5233f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/DatanodeStateMachine.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.ozone.container.common.statemachine.commandhandler
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.DeleteBlocksCommandHandler;
import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
+ .DeleteContainerCommandHandler;
+import org.apache.hadoop.ozone.container.common.statemachine.commandhandler
.ReplicateContainerCommandHandler;
import org.apache.hadoop.ozone.container.keyvalue.TarContainerPacker;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
@@ -115,6 +117,7 @@ public class DatanodeStateMachine implements Closeable {
.addHandler(new DeleteBlocksCommandHandler(container.getContainerSet(),
conf))
.addHandler(new ReplicateContainerCommandHandler(conf, supervisor))
+ .addHandler(new DeleteContainerCommandHandler())
.setConnectionManager(connectionManager)
.setContainer(container)
.setContext(context)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index 195e51b..4a979fd 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -19,11 +19,10 @@ package
org.apache.hadoop.ozone.container.common.statemachine;
import com.google.common.base.Preconditions;
import com.google.protobuf.GeneratedMessage;
import java.util.Map;
+import java.util.Optional;
import java.util.concurrent.ConcurrentHashMap;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.SCMCommandProto.Type;
-import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.PipelineAction;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerAction;
@@ -427,22 +426,27 @@ public class StateContext {
* @param cmd - {@link SCMCommand}.
*/
public void addCmdStatus(SCMCommand cmd) {
- if (cmd.getType().equals(Type.closeContainerCommand)) {
- // We will be removing CommandStatus completely.
- // As a first step, removed it for CloseContainerCommand.
- return;
- }
- CommandStatusBuilder statusBuilder;
- if (cmd.getType() == Type.deleteBlocksCommand) {
- statusBuilder = new DeleteBlockCommandStatusBuilder();
- } else {
- statusBuilder = CommandStatusBuilder.newBuilder();
+ final Optional<CommandStatusBuilder> cmdStatusBuilder;
+ switch (cmd.getType()) {
+ case replicateContainerCommand:
+ cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
+ break;
+ case deleteBlocksCommand:
+ cmdStatusBuilder = Optional.of(
+ DeleteBlockCommandStatusBuilder.newBuilder());
+ break;
+ case deleteContainerCommand:
+ cmdStatusBuilder = Optional.of(CommandStatusBuilder.newBuilder());
+ break;
+ default:
+ cmdStatusBuilder = Optional.empty();
}
- this.addCmdStatus(cmd.getId(),
- statusBuilder.setCmdId(cmd.getId())
+ cmdStatusBuilder.ifPresent(statusBuilder ->
+ addCmdStatus(cmd.getId(), statusBuilder
+ .setCmdId(cmd.getId())
.setStatus(Status.PENDING)
.setType(cmd.getType())
- .build());
+ .build()));
}
/**
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index b34b352..60a0255 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -16,7 +16,6 @@
*/
package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
-import com.google.protobuf.InvalidProtocolBufferException;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.datanode.proto.ContainerProtos;
import org.apache.hadoop.hdds.protocol.datanode.proto
@@ -31,6 +30,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.Time;
import org.apache.ratis.protocol.NotLeaderException;
@@ -68,61 +68,57 @@ public class CloseContainerCommandHandler implements
CommandHandler {
@Override
public void handle(SCMCommand command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
+ LOG.debug("Processing Close Container command.");
+ invocationCount++;
+ final long startTime = Time.monotonicNow();
+ final DatanodeDetails datanodeDetails = context.getParent()
+ .getDatanodeDetails();
+ final CloseContainerCommandProto closeCommand =
+ ((CloseContainerCommand)command).getProto();
+ final ContainerController controller = ozoneContainer.getController();
+ final long containerId = closeCommand.getContainerID();
try {
- LOG.debug("Processing Close Container command.");
- invocationCount++;
- final long startTime = Time.monotonicNow();
- final DatanodeDetails datanodeDetails = context.getParent()
- .getDatanodeDetails();
- final CloseContainerCommandProto closeCommand =
- CloseContainerCommandProto.parseFrom(command.getProtoBufMessage());
- final ContainerController controller = ozoneContainer.getController();
- final long containerId = closeCommand.getContainerID();
- try {
- final Container container = controller.getContainer(containerId);
+ final Container container = controller.getContainer(containerId);
- if (container == null) {
- LOG.error("Container #{} does not exist in datanode. "
- + "Container close failed.", containerId);
- return;
- }
+ if (container == null) {
+ LOG.error("Container #{} does not exist in datanode. "
+ + "Container close failed.", containerId);
+ return;
+ }
- // Move the container to CLOSING state
- controller.markContainerForClose(containerId);
+ // Move the container to CLOSING state
+ controller.markContainerForClose(containerId);
- // If the container is part of open pipeline, close it via write
channel
- if (ozoneContainer.getWriteChannel()
- .isExist(closeCommand.getPipelineID())) {
- if (closeCommand.getForce()) {
- LOG.warn("Cannot force close a container when the container is" +
- " part of an active pipeline.");
- return;
- }
- ContainerCommandRequestProto request =
- getContainerCommandRequestProto(datanodeDetails,
- closeCommand.getContainerID());
- ozoneContainer.getWriteChannel().submitRequest(
- request, closeCommand.getPipelineID());
+ // If the container is part of open pipeline, close it via write channel
+ if (ozoneContainer.getWriteChannel()
+ .isExist(closeCommand.getPipelineID())) {
+ if (closeCommand.getForce()) {
+ LOG.warn("Cannot force close a container when the container is" +
+ " part of an active pipeline.");
return;
}
- // If we reach here, there is no active pipeline for this container.
- if (!closeCommand.getForce()) {
- // QUASI_CLOSE the container.
- controller.quasiCloseContainer(containerId);
- } else {
- // SCM told us to force close the container.
- controller.closeContainer(containerId);
- }
- } catch (NotLeaderException e) {
- LOG.debug("Follower cannot close container #{}.", containerId);
- } catch (IOException e) {
- LOG.error("Can't close container #{}", containerId, e);
- } finally {
- long endTime = Time.monotonicNow();
- totalTime += endTime - startTime;
+ ContainerCommandRequestProto request =
+ getContainerCommandRequestProto(datanodeDetails,
+ closeCommand.getContainerID());
+ ozoneContainer.getWriteChannel().submitRequest(
+ request, closeCommand.getPipelineID());
+ return;
+ }
+ // If we reach here, there is no active pipeline for this container.
+ if (!closeCommand.getForce()) {
+ // QUASI_CLOSE the container.
+ controller.quasiCloseContainer(containerId);
+ } else {
+ // SCM told us to force close the container.
+ controller.closeContainer(containerId);
}
- } catch (InvalidProtocolBufferException ex) {
- LOG.error("Exception while closing container", ex);
+ } catch (NotLeaderException e) {
+ LOG.debug("Follower cannot close container #{}.", containerId);
+ } catch (IOException e) {
+ LOG.error("Can't close container #{}", containerId, e);
+ } finally {
+ long endTime = Time.monotonicNow();
+ totalTime += endTime - startTime;
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
new file mode 100644
index 0000000..2842b1a
--- /dev/null
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+package org.apache.hadoop.ozone.container.common.statemachine.commandhandler;
+
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.SCMCommandProto;
+import org.apache.hadoop.ozone.container.common.statemachine
+ .SCMConnectionManager;
+import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
+import org.apache.hadoop.ozone.container.ozoneimpl.ContainerController;
+import org.apache.hadoop.ozone.container.ozoneimpl.OzoneContainer;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
+import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
+import org.apache.hadoop.util.Time;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+/**
+ * Handler to process the DeleteContainerCommand from SCM.
+ */
+public class DeleteContainerCommandHandler implements CommandHandler {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(DeleteContainerCommandHandler.class);
+
+ private int invocationCount;
+ private long totalTime;
+
+ @Override
+ public void handle(final SCMCommand command,
+ final OzoneContainer ozoneContainer,
+ final StateContext context,
+ final SCMConnectionManager connectionManager) {
+ final long startTime = Time.monotonicNow();
+ invocationCount++;
+ try {
+ final DeleteContainerCommand deleteContainerCommand =
+ (DeleteContainerCommand) command;
+ final ContainerController controller = ozoneContainer.getController();
+ controller.deleteContainer(deleteContainerCommand.getContainerID());
+ updateCommandStatus(context, command,
+ (cmdStatus) -> cmdStatus.setStatus(true), LOG);
+ } catch (IOException e) {
+ updateCommandStatus(context, command,
+ (cmdStatus) -> cmdStatus.setStatus(false), LOG);
+ LOG.error("Exception occurred while deleting the container.", e);
+ } finally {
+ totalTime += Time.monotonicNow() - startTime;
+ }
+
+ }
+
+ @Override
+ public SCMCommandProto.Type getCommandType() {
+ return SCMCommandProto.Type.deleteContainerCommand;
+ }
+
+ @Override
+ public int getInvocationCount() {
+ return this.invocationCount;
+ }
+
+ @Override
+ public long getAverageRunTime() {
+ return invocationCount == 0 ? 0 : totalTime / invocationCount;
+ }
+}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
index 0c0f1af..513043f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/states/endpoint/HeartbeatEndpointTask.java
@@ -47,6 +47,7 @@ import org.apache.hadoop.ozone.container.common.statemachine
import org.apache.hadoop.ozone.container.common.statemachine.StateContext;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.DeleteBlocksCommand;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.slf4j.Logger;
@@ -294,6 +295,16 @@ public class HeartbeatEndpointTask
}
this.context.addCommand(replicateContainerCommand);
break;
+ case deleteContainerCommand:
+ DeleteContainerCommand deleteContainerCommand =
+ DeleteContainerCommand.getFromProtobuf(
+ commandResponseProto.getDeleteContainerCommandProto());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Received SCM delete container request for container {}",
+ deleteContainerCommand.getContainerID());
+ }
+ this.context.addCommand(deleteContainerCommand);
+ break;
default:
throw new IllegalArgumentException("Unknown response : "
+ commandResponseProto.getCommandType().name());
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
index 01964ba..261dbc4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/keyvalue/KeyValueHandler.java
@@ -240,6 +240,7 @@ public class KeyValueHandler extends Handler {
if (containerSet.getContainer(containerID) == null) {
newContainer.create(volumeSet, volumeChoosingPolicy, scmID);
containerSet.addContainer(newContainer);
+ sendICR(newContainer);
} else {
// The create container request for an already existing container can
@@ -335,37 +336,10 @@ public class KeyValueHandler extends Handler {
}
boolean forceDelete = request.getDeleteContainer().getForceDelete();
- kvContainer.writeLock();
try {
- // Check if container is open
- if (kvContainer.getContainerData().isOpen()) {
- kvContainer.writeUnlock();
- throw new StorageContainerException(
- "Deletion of Open Container is not allowed.",
- DELETE_ON_OPEN_CONTAINER);
- } else if (!forceDelete && kvContainer.getContainerData().getKeyCount()
- > 0) {
- // If the container is not empty and cannot be deleted forcibly,
- // then throw a SCE to stop deleting.
- kvContainer.writeUnlock();
- throw new StorageContainerException(
- "Container cannot be deleted because it is not empty.",
- ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
- } else {
- long containerId = kvContainer.getContainerData().getContainerID();
- containerSet.removeContainer(containerId);
- // Release the lock first.
- // Avoid holding write locks for disk operations
- kvContainer.writeUnlock();
-
- kvContainer.delete(forceDelete);
- }
+ deleteInternal(kvContainer, forceDelete);
} catch (StorageContainerException ex) {
return ContainerUtils.logAndReturnError(LOG, ex, request);
- } finally {
- if (kvContainer.hasWriteLock()) {
- kvContainer.writeUnlock();
- }
}
return ContainerUtils.getSuccessResponse(request);
}
@@ -823,6 +797,7 @@ public class KeyValueHandler extends Handler {
populateContainerPathFields(container, maxSize);
container.importContainerData(rawContainerStream, packer);
+ sendICR(container);
return container;
}
@@ -877,4 +852,35 @@ public class KeyValueHandler extends Handler {
container.close();
sendICR(container);
}
+
+ @Override
+ public void deleteContainer(Container container) throws IOException {
+ deleteInternal(container, true);
+ }
+
+ private void deleteInternal(Container container, boolean force)
+ throws StorageContainerException {
+ container.writeLock();
+ try {
+ // Check if container is open
+ if (container.getContainerData().isOpen()) {
+ throw new StorageContainerException(
+ "Deletion of Open Container is not allowed.",
+ DELETE_ON_OPEN_CONTAINER);
+ }
+ if (!force && container.getContainerData().getKeyCount() > 0) {
+ // If the container is not empty and cannot be deleted forcibly,
+ // then throw a SCE to stop deleting.
+ throw new StorageContainerException(
+ "Container cannot be deleted because it is not empty.",
+ ContainerProtos.Result.ERROR_CONTAINER_NOT_EMPTY);
+ }
+ long containerId = container.getContainerData().getContainerID();
+ containerSet.removeContainer(containerId);
+ } finally {
+ container.writeUnlock();
+ }
+ // Avoid holding write locks for disk operations
+ container.delete(force);
+ }
}
\ No newline at end of file
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
index 1a918ed..4dedd1f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/ozoneimpl/ContainerController.java
@@ -94,7 +94,7 @@ public class ContainerController {
}
/**
- * Closes a container given its id.
+ * Closes a container given its Id.
*
* @param containerId Id of the container to close
* @throws IOException in case of exception
@@ -114,6 +114,16 @@ public class ContainerController {
}
/**
+ * Deletes a container given its Id.
+ * @param containerId Id of the container to be deleted
+ * @throws IOException
+ */
+ public void deleteContainer(final long containerId) throws IOException {
+ final Container container = containerSet.getContainer(containerId);
+ getHandler(container).deleteContainer(container);
+ }
+
+ /**
* Given a container, returns its handler instance.
*
* @param container Container
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
index 96c22ac..ded0464 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/CloseContainerCommand.java
@@ -54,16 +54,7 @@ public class CloseContainerCommand
return SCMCommandProto.Type.closeContainerCommand;
}
- /**
- * Gets the protobuf message of this object.
- *
- * @return A protobuf message.
- */
@Override
- public byte[] getProtoBufMessage() {
- return getProto().toByteArray();
- }
-
public CloseContainerCommandProto getProto() {
return CloseContainerCommandProto.newBuilder()
.setContainerID(getId())
@@ -84,4 +75,8 @@ public class CloseContainerCommand
public long getContainerID() {
return getId();
}
+
+ public PipelineID getPipelineID() {
+ return pipelineID;
+ }
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
index 07feeff..03a876c 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteBlocksCommand.java
@@ -56,17 +56,13 @@ public class DeleteBlocksCommand extends
return SCMCommandProto.Type.deleteBlocksCommand;
}
- @Override
- public byte[] getProtoBufMessage() {
- return getProto().toByteArray();
- }
-
public static DeleteBlocksCommand getFromProtobuf(
DeleteBlocksCommandProto deleteBlocksProto) {
return new DeleteBlocksCommand(deleteBlocksProto
.getDeletedBlocksTransactionsList(), deleteBlocksProto.getCmdId());
}
+ @Override
public DeleteBlocksCommandProto getProto() {
return DeleteBlocksCommandProto.newBuilder()
.setCmdId(getId())
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
similarity index 50%
copy from
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
copy to
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
index 09f361d..8e0b172 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/DeleteContainerCommand.java
@@ -15,53 +15,48 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
+
package org.apache.hadoop.ozone.protocol.commands;
+import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
-
-import static org.apache.hadoop.hdds.protocol.proto
- .StorageContainerDatanodeProtocolProtos.ReregisterCommandProto;
+import org.apache.hadoop.hdds.protocol.proto
+ .StorageContainerDatanodeProtocolProtos.DeleteContainerCommandProto;
/**
- * Informs a datanode to register itself with SCM again.
+ * SCM command which tells the datanode to delete a container.
*/
-public class ReregisterCommand extends
- SCMCommand<ReregisterCommandProto>{
+public class DeleteContainerCommand extends
+ SCMCommand<DeleteContainerCommandProto> {
+
+ private final long containerId;
+
+ public DeleteContainerCommand(long containerId) {
+ this.containerId = containerId;
+ }
- /**
- * Returns the type of this command.
- *
- * @return Type
- */
@Override
public SCMCommandProto.Type getType() {
- return SCMCommandProto.Type.reregisterCommand;
+ return SCMCommandProto.Type.deleteContainerCommand;
}
- /**
- * Gets the protobuf message of this object.
- *
- * @return A protobuf message.
- */
@Override
- public byte[] getProtoBufMessage() {
- return getProto().toByteArray();
+ public DeleteContainerCommandProto getProto() {
+ DeleteContainerCommandProto.Builder builder =
+ DeleteContainerCommandProto.newBuilder();
+ builder.setCmdId(getId())
+ .setContainerID(getContainerID());
+ return builder.build();
}
- /**
- * Not implemented for ReregisterCommand.
- *
- * @return cmdId.
- */
- @Override
- public long getId() {
- return 0;
+ public long getContainerID() {
+ return containerId;
}
- public ReregisterCommandProto getProto() {
- return ReregisterCommandProto
- .newBuilder()
- .build();
+ public static DeleteContainerCommand getFromProtobuf(
+ DeleteContainerCommandProto protoMessage) {
+ Preconditions.checkNotNull(protoMessage);
+ return new DeleteContainerCommand(protoMessage.getContainerID());
}
}
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
index 8530285..e663bed 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReplicateContainerCommand.java
@@ -63,10 +63,6 @@ public class ReplicateContainerCommand
}
@Override
- public byte[] getProtoBufMessage() {
- return getProto().toByteArray();
- }
-
public ReplicateContainerCommandProto getProto() {
Builder builder = ReplicateContainerCommandProto.newBuilder()
.setCmdId(getId())
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
index 09f361d..e3ea4ae 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/ReregisterCommand.java
@@ -40,16 +40,6 @@ public class ReregisterCommand extends
}
/**
- * Gets the protobuf message of this object.
- *
- * @return A protobuf message.
- */
- @Override
- public byte[] getProtoBufMessage() {
- return getProto().toByteArray();
- }
-
- /**
* Not implemented for ReregisterCommand.
*
* @return cmdId.
@@ -59,6 +49,7 @@ public class ReregisterCommand extends
return 0;
}
+ @Override
public ReregisterCommandProto getProto() {
return ReregisterCommandProto
.newBuilder()
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 5773bf1..3c4e05b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -49,7 +49,7 @@ public abstract class SCMCommand<T extends GeneratedMessage>
implements
* Gets the protobuf message of this object.
* @return A protobuf message.
*/
- public abstract byte[] getProtoBufMessage();
+ public abstract T getProto();
/**
* Gets the commandId of this object.
diff --git
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
index 511feac..fdd7af8 100644
---
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerCommandHandler.java
@@ -70,6 +70,8 @@ public class TestCloseContainerCommandHandler {
try {
final Container container =
createContainer(conf, datanodeDetails, ozoneContainer);
+ Mockito.verify(context.getParent(),
+ Mockito.times(1)).triggerHeartbeat();
final long containerId = container.getContainerData().getContainerID();
final PipelineID pipelineId = PipelineID.valueOf(UUID.fromString(
container.getContainerData().getOriginPipelineId()));
@@ -88,7 +90,7 @@ public class TestCloseContainerCommandHandler {
.getContainerState());
Mockito.verify(context.getParent(),
- Mockito.times(2)).triggerHeartbeat();
+ Mockito.times(3)).triggerHeartbeat();
} finally {
ozoneContainer.stop();
}
@@ -105,6 +107,8 @@ public class TestCloseContainerCommandHandler {
try {
final Container container =
createContainer(conf, datanodeDetails, ozoneContainer);
+ Mockito.verify(context.getParent(),
+ Mockito.times(1)).triggerHeartbeat();
final long containerId = container.getContainerData().getContainerID();
// To quasi close specify a pipeline which doesn't exist in the datanode.
final PipelineID pipelineId = PipelineID.randomId();
@@ -122,7 +126,7 @@ public class TestCloseContainerCommandHandler {
.getContainerState());
Mockito.verify(context.getParent(),
- Mockito.times(2)).triggerHeartbeat();
+ Mockito.times(3)).triggerHeartbeat();
} finally {
ozoneContainer.stop();
}
@@ -138,6 +142,8 @@ public class TestCloseContainerCommandHandler {
try {
final Container container =
createContainer(conf, datanodeDetails, ozoneContainer);
+ Mockito.verify(context.getParent(),
+ Mockito.times(1)).triggerHeartbeat();
final long containerId = container.getContainerData().getContainerID();
// A pipeline which doesn't exist in the datanode.
final PipelineID pipelineId = PipelineID.randomId();
@@ -155,7 +161,7 @@ public class TestCloseContainerCommandHandler {
.getContainerState());
Mockito.verify(context.getParent(),
- Mockito.times(2)).triggerHeartbeat();
+ Mockito.times(3)).triggerHeartbeat();
// The container is quasi closed. Force close the container now.
final CloseContainerCommand closeCommand = new CloseContainerCommand(
@@ -168,7 +174,7 @@ public class TestCloseContainerCommandHandler {
.getContainerState());
Mockito.verify(context.getParent(),
- Mockito.times(3)).triggerHeartbeat();
+ Mockito.times(4)).triggerHeartbeat();
} finally {
ozoneContainer.stop();
}
@@ -184,6 +190,8 @@ public class TestCloseContainerCommandHandler {
try {
final Container container =
createContainer(conf, datanodeDetails, ozoneContainer);
+ Mockito.verify(context.getParent(),
+ Mockito.times(1)).triggerHeartbeat();
final long containerId = container.getContainerData().getContainerID();
// A pipeline which doesn't exist in the datanode.
final PipelineID pipelineId = PipelineID.randomId();
@@ -201,7 +209,7 @@ public class TestCloseContainerCommandHandler {
.getContainerState());
Mockito.verify(context.getParent(),
- Mockito.times(2)).triggerHeartbeat();
+ Mockito.times(3)).triggerHeartbeat();
} finally {
ozoneContainer.stop();
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
index 53dfc5a..0ef02a3 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/command/CommandStatusReportHandler.java
@@ -20,6 +20,7 @@ package org.apache.hadoop.hdds.scm.command;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.CommandStatusReportFromDatanode;
@@ -57,6 +58,11 @@ public class CommandStatusReportHandler implements
case replicateContainerCommand:
publisher.fireEvent(SCMEvents.REPLICATION_STATUS, new
ReplicationStatus(cmdStatus));
+ if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
+ publisher.fireEvent(SCMEvents.REPLICATION_COMPLETE,
+ new ReplicationManager.ReplicationCompleted(
+ cmdStatus.getCmdId()));
+ }
break;
case deleteBlocksCommand:
if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
@@ -64,6 +70,12 @@ public class CommandStatusReportHandler implements
new DeleteBlockStatus(cmdStatus));
}
break;
+ case deleteContainerCommand:
+ if (cmdStatus.getStatus() == CommandStatus.Status.EXECUTED) {
+ publisher.fireEvent(SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
+ new ReplicationManager.DeleteContainerCommandCompleted(
+ cmdStatus.getCmdId()));
+ }
default:
LOGGER.debug("CommandStatus of type:{} not handled in " +
"CommandStatusReportHandler.", cmdStatus.getType());
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
new file mode 100644
index 0000000..0b1e4c8
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/DeleteContainerCommandWatcher.java
@@ -0,0 +1,56 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.container;
+
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .DeletionRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .DeleteContainerCommandCompleted;
+import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.server.events.Event;
+import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.server.events.EventWatcher;
+import org.apache.hadoop.ozone.lease.LeaseManager;
+
+/**
+ * Command watcher to track the delete container commands.
+ */
+public class DeleteContainerCommandWatcher extends
+ EventWatcher<DeletionRequestToRepeat, DeleteContainerCommandCompleted> {
+
+ public DeleteContainerCommandWatcher(
+ Event<DeletionRequestToRepeat> startEvent,
+ Event<DeleteContainerCommandCompleted> completionEvent,
+ LeaseManager<Long> leaseManager) {
+ super(startEvent, completionEvent, leaseManager);
+ }
+
+ @Override
+ protected void onTimeout(EventPublisher publisher,
+ DeletionRequestToRepeat payload) {
+ //put back to the original queue
+ publisher.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ payload.getRequest());
+ }
+
+
+ @Override
+ protected void onFinished(EventPublisher publisher,
+ DeletionRequestToRepeat payload) {
+
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
index d9c3090..c566ca9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReportHandlerHelper.java
@@ -204,10 +204,7 @@ public final class ReportHandlerHelper {
.distinct()
.count();
- float quasiClosePercent = ((float) uniqueQuasiClosedReplicaCount) /
- ((float) replicationFactor);
-
- if (quasiClosePercent > 0.5F) {
+ if (uniqueQuasiClosedReplicaCount > (replicationFactor / 2)) {
// Quorum of unique replica has been QUASI_CLOSED
long sequenceId = forceCloseContainerReplicaWithHighestSequenceId(
container, quasiClosedReplicas, publisher);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
index e700ecd..d65e45f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ReplicationManager.java
@@ -18,18 +18,24 @@ package org.apache.hadoop.hdds.scm.container.replication;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.LinkedHashMap;
import java.util.List;
+import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.UUID;
import java.util.concurrent.ThreadFactory;
import java.util.stream.Collectors;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
+import org.apache.hadoop.hdds.scm.container.DeleteContainerCommandWatcher;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
@@ -38,11 +44,14 @@ import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.server.events.IdentifiableEventPayload;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
+import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import com.google.common.annotations.VisibleForTesting;
-import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
+
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+ .TRACK_DELETE_CONTAINER_COMMAND;
import static org.apache.hadoop.hdds.scm.events.SCMEvents
.TRACK_REPLICATE_COMMAND;
import org.slf4j.Logger;
@@ -63,6 +72,7 @@ public class ReplicationManager implements Runnable {
private EventPublisher eventPublisher;
private ReplicationCommandWatcher replicationCommandWatcher;
+ private DeleteContainerCommandWatcher deleteContainerCommandWatcher;
private boolean running = true;
@@ -80,6 +90,11 @@ public class ReplicationManager implements Runnable {
new ReplicationCommandWatcher(TRACK_REPLICATE_COMMAND,
SCMEvents.REPLICATION_COMPLETE, commandWatcherLeaseManager);
+ this.deleteContainerCommandWatcher =
+ new DeleteContainerCommandWatcher(TRACK_DELETE_CONTAINER_COMMAND,
+ SCMEvents.DELETE_CONTAINER_COMMAND_COMPLETE,
+ commandWatcherLeaseManager);
+
this.replicationQueue = new ReplicationQueue();
eventQueue.addHandler(SCMEvents.REPLICATE_CONTAINER,
@@ -108,15 +123,15 @@ public class ReplicationManager implements Runnable {
request = replicationQueue.take();
ContainerID containerID = new ContainerID(request.getContainerId());
- ContainerInfo containerInfo =
- containerManager.getContainer(containerID);
-
- Preconditions.checkNotNull(containerInfo,
- "No information about the container " + request.getContainerId());
-
- Preconditions
- .checkState(containerInfo.getState() == LifeCycleState.CLOSED,
- "Container should be in closed state");
+ ContainerInfo container = containerManager.getContainer(containerID);
+ final HddsProtos.LifeCycleState state = container.getState();
+
+ if (state != LifeCycleState.CLOSED &&
+ state != LifeCycleState.QUASI_CLOSED) {
+ LOG.warn("Cannot replicate the container {} when in {} state.",
+ containerID, state);
+ continue;
+ }
//check the current replication
List<ContainerReplica> containerReplicas =
@@ -130,28 +145,41 @@ public class ReplicationManager implements Runnable {
return;
}
- ReplicationRequest finalRequest = request;
+ final ReplicationRequest finalRequest = request;
int inFlightReplications = replicationCommandWatcher.getTimeoutEvents(
- e -> e.request.getContainerId() == finalRequest.getContainerId())
+ e -> e.getRequest().getContainerId()
+ == finalRequest.getContainerId())
+ .size();
+
+ int inFlightDelete = deleteContainerCommandWatcher.getTimeoutEvents(
+ e -> e.getRequest().getContainerId()
+ == finalRequest.getContainerId())
.size();
int deficit =
- request.getExpecReplicationCount() - containerReplicas.size()
- - inFlightReplications;
+ (request.getExpecReplicationCount() - containerReplicas.size())
+ - (inFlightReplications - inFlightDelete);
if (deficit > 0) {
List<DatanodeDetails> datanodes = containerReplicas.stream()
+ .sorted((r1, r2) ->
+ r2.getSequenceId().compareTo(r1.getSequenceId()))
.map(ContainerReplica::getDatanodeDetails)
.collect(Collectors.toList());
List<DatanodeDetails> selectedDatanodes = containerPlacement
- .chooseDatanodes(datanodes, deficit,
- containerInfo.getUsedBytes());
+ .chooseDatanodes(datanodes, deficit, container.getUsedBytes());
//send the command
for (DatanodeDetails datanode : selectedDatanodes) {
+ LOG.info("Container {} is under replicated." +
+ " Expected replica count is {}, but found {}." +
+ " Re-replicating it on {}.",
+ container.containerID(), request.getExpecReplicationCount(),
+ containerReplicas.size(), datanode);
+
ReplicateContainerCommand replicateCommand =
new ReplicateContainerCommand(containerID.getId(), datanodes);
@@ -168,8 +196,62 @@ public class ReplicationManager implements Runnable {
}
} else if (deficit < 0) {
- //TODO: too many replicas. Not handled yet.
- LOG.debug("Too many replicas is not handled yet.");
+
+ int numberOfReplicasToDelete = Math.abs(deficit);
+
+ final Map<UUID, List<DatanodeDetails>> originIdToDnMap =
+ new LinkedHashMap<>();
+
+ containerReplicas.stream()
+ .sorted(Comparator.comparing(ContainerReplica::getSequenceId))
+ .forEach(replica -> {
+ originIdToDnMap.computeIfAbsent(
+ replica.getOriginDatanodeId(), key -> new ArrayList<>());
+ originIdToDnMap.get(replica.getOriginDatanodeId())
+ .add(replica.getDatanodeDetails());
+ });
+
+ for(UUID originId : originIdToDnMap.keySet()) {
+ final List<DatanodeDetails> listOfReplica =
+ originIdToDnMap.get(originId);
+ if (listOfReplica.size() > 1) {
+ final int toDelete = Math.min(listOfReplica.size() - 1,
+ numberOfReplicasToDelete);
+ final DeleteContainerCommand deleteContainer =
+ new DeleteContainerCommand(containerID.getId());
+ for (int i = 0; i < toDelete; i++) {
+ LOG.info("Container {} is over replicated." +
+ " Expected replica count is {}, but found {}." +
+ " Deleting the replica on {}.",
+ container.containerID(),
request.getExpecReplicationCount(),
+ containerReplicas.size(), listOfReplica.get(i));
+ eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
+ new CommandForDatanode<>(listOfReplica.get(i).getUuid(),
+ deleteContainer));
+ DeletionRequestToRepeat timeoutEvent =
+ new DeletionRequestToRepeat(deleteContainer.getId(),
+ request);
+
+ eventPublisher.fireEvent(
+ TRACK_DELETE_CONTAINER_COMMAND, timeoutEvent);
+ }
+ numberOfReplicasToDelete -= toDelete;
+ }
+ if (numberOfReplicasToDelete == 0) {
+ break;
+ }
+ }
+
+ if (numberOfReplicasToDelete != 0) {
+ final int expectedReplicaCount = container
+ .getReplicationFactor().getNumber();
+
+ LOG.warn("Not able to delete the container replica of Container" +
+ " {} even though it is over replicated. Expected replica" +
+ " count is {}, current replica count is {}.",
+ containerID, expectedReplicaCount,
+ expectedReplicaCount + numberOfReplicasToDelete);
+ }
}
} catch (Exception e) {
@@ -196,17 +278,43 @@ public class ReplicationManager implements Runnable {
}
/**
- * Event for the ReplicationCommandWatcher to repeate the embedded request.
+ * Event for the ReplicationCommandWatcher to repeat the embedded request.
* in case fof timeout.
*/
public static class ReplicationRequestToRepeat
+ extends ContainerRequestToRepeat {
+
+ public ReplicationRequestToRepeat(
+ long commandId, ReplicationRequest request) {
+ super(commandId, request);
+ }
+ }
+
+ /**
+ * Event for the DeleteContainerCommandWatcher to repeat the
+ * embedded request. In case fof timeout.
+ */
+ public static class DeletionRequestToRepeat
+ extends ContainerRequestToRepeat {
+
+ public DeletionRequestToRepeat(
+ long commandId, ReplicationRequest request) {
+ super(commandId, request);
+ }
+ }
+
+ /**
+ * Container Request wrapper which will be used by ReplicationManager to
+ * perform the intended operation.
+ */
+ public static class ContainerRequestToRepeat
implements IdentifiableEventPayload {
private final long commandId;
private final ReplicationRequest request;
- public ReplicationRequestToRepeat(long commandId,
+ ContainerRequestToRepeat(long commandId,
ReplicationRequest request) {
this.commandId = commandId;
this.request = request;
@@ -229,7 +337,7 @@ public class ReplicationManager implements Runnable {
if (o == null || getClass() != o.getClass()) {
return false;
}
- ReplicationRequestToRepeat that = (ReplicationRequestToRepeat) o;
+ ContainerRequestToRepeat that = (ContainerRequestToRepeat) o;
return Objects.equals(request, that.request);
}
@@ -241,7 +349,7 @@ public class ReplicationManager implements Runnable {
}
/**
- * Add javadoc.
+ * Event which indicates that the replicate operation is completed.
*/
public static class ReplicationCompleted
implements IdentifiableEventPayload {
@@ -257,4 +365,22 @@ public class ReplicationManager implements Runnable {
return uuid;
}
}
+
+ /**
+ * Event which indicates that the container deletion operation is completed.
+ */
+ public static class DeleteContainerCommandCompleted
+ implements IdentifiableEventPayload {
+
+ private final long uuid;
+
+ public DeleteContainerCommandCompleted(long uuid) {
+ this.uuid = uuid;
+ }
+
+ @Override
+ public long getId() {
+ return uuid;
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 72d416b..51e1306 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -41,6 +41,8 @@ import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher
.NodeReportFromDatanode;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
+ .DeleteContainerCommandCompleted;
+import org.apache.hadoop.hdds.scm.container.replication.ReplicationManager
.ReplicationCompleted;
import org.apache.hadoop.hdds.scm.container.replication.ReplicationRequest;
@@ -204,6 +206,14 @@ public final class SCMEvents {
public static final TypedEvent<ReplicationManager.ReplicationRequestToRepeat>
TRACK_REPLICATE_COMMAND =
new TypedEvent<>(ReplicationManager.ReplicationRequestToRepeat.class);
+
+ /**
+ * This event is sent by the ReplicaManager to the
+ * DeleteContainerCommandWatcher to track the in-progress delete commands.
+ */
+ public static final TypedEvent<ReplicationManager.DeletionRequestToRepeat>
+ TRACK_DELETE_CONTAINER_COMMAND =
+ new TypedEvent<>(ReplicationManager.DeletionRequestToRepeat.class);
/**
* This event comes from the Heartbeat dispatcher (in fact from the
* datanode) to notify the scm that the replication is done. This is
@@ -216,6 +226,10 @@ public final class SCMEvents {
public static final TypedEvent<ReplicationCompleted> REPLICATION_COMPLETE =
new TypedEvent<>(ReplicationCompleted.class);
+ public static final TypedEvent<DeleteContainerCommandCompleted>
+ DELETE_CONTAINER_COMMAND_COMPLETE =
+ new TypedEvent<>(DeleteContainerCommandCompleted.class);
+
/**
* Signal for all the components (but especially for the replication
* manager and container report handler) that the replication could be
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 0be279d..fbe2641 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -21,13 +21,10 @@ import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashSet;
import java.util.List;
-import java.util.Objects;
import java.util.stream.IntStream;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.protocol.proto
@@ -35,21 +32,24 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManager;
+import org.apache.hadoop.hdds.scm.container.ContainerNotFoundException;
import org.apache.hadoop.hdds.scm.container.ContainerReplica;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.container.placement.algorithms
.ContainerPlacementPolicy;
import org.apache.hadoop.hdds.scm.container.replication
.ReplicationManager.ReplicationRequestToRepeat;
+import org.apache.hadoop.hdds.scm.container.replication
+ .ReplicationManager.DeletionRequestToRepeat;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.lease.LeaseManager;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
-import com.google.common.base.Preconditions;
-import static
org.apache.hadoop.hdds.scm.events.SCMEvents.TRACK_REPLICATE_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+ .TRACK_DELETE_CONTAINER_COMMAND;
+import static org.apache.hadoop.hdds.scm.events.SCMEvents
+ .TRACK_REPLICATE_COMMAND;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
@@ -65,6 +65,7 @@ public class TestReplicationManager {
private EventQueue queue;
private List<ReplicationRequestToRepeat> trackReplicationEvents;
+ private List<DeletionRequestToRepeat> trackDeleteEvents;
private List<CommandForDatanode<ReplicateContainerCommandProto>> copyEvents;
@@ -87,6 +88,8 @@ public class TestReplicationManager {
listOfContainerReplica.add(ContainerReplica.newBuilder()
.setContainerID(ContainerID.valueof(i))
.setContainerState(ContainerReplicaProto.State.CLOSED)
+ .setSequenceId(10000L)
+ .setOriginNodeId(dd.getUuid())
.setDatanodeDetails(dd).build());
});
@@ -119,6 +122,10 @@ public class TestReplicationManager {
queue.addHandler(TRACK_REPLICATE_COMMAND,
(event, publisher) -> trackReplicationEvents.add(event));
+ trackDeleteEvents = new ArrayList<>();
+ queue.addHandler(TRACK_DELETE_CONTAINER_COMMAND,
+ (event, publisher) -> trackDeleteEvents.add(event));
+
copyEvents = new ArrayList<>();
queue.addHandler(SCMEvents.DATANODE_COMMAND,
(event, publisher) -> copyEvents.add(event));
@@ -128,8 +135,6 @@ public class TestReplicationManager {
replicationManager = new ReplicationManager(containerPlacementPolicy,
containerManager, queue, leaseManager);
-
-
}
/**
@@ -161,6 +166,57 @@ public class TestReplicationManager {
}
@Test
+ public void testOverReplication() throws ContainerNotFoundException,
+ InterruptedException {
+ try {
+ leaseManager.start();
+ replicationManager.start();
+
+ final ContainerID containerID = ContainerID.valueof(5L);
+
+ final ContainerReplica duplicateReplicaOne =
ContainerReplica.newBuilder()
+ .setContainerID(containerID)
+ .setContainerState(ContainerReplicaProto.State.CLOSED)
+ .setSequenceId(10000L)
+ .setOriginNodeId(listOfDatanodeDetails.get(0).getUuid())
+ .setDatanodeDetails(listOfDatanodeDetails.get(3))
+ .build();
+
+ final ContainerReplica duplicateReplicaTwo =
ContainerReplica.newBuilder()
+ .setContainerID(containerID)
+ .setContainerState(ContainerReplicaProto.State.CLOSED)
+ .setSequenceId(10000L)
+ .setOriginNodeId(listOfDatanodeDetails.get(1).getUuid())
+ .setDatanodeDetails(listOfDatanodeDetails.get(4))
+ .build();
+
+ when(containerManager.getContainerReplicas(new ContainerID(5L)))
+ .thenReturn(new HashSet<>(Arrays.asList(
+ listOfContainerReplica.get(0),
+ listOfContainerReplica.get(1),
+ listOfContainerReplica.get(2),
+ duplicateReplicaOne,
+ duplicateReplicaTwo
+ )));
+
+ queue.fireEvent(SCMEvents.REPLICATE_CONTAINER,
+ new ReplicationRequest(5L, (short) 5, System.currentTimeMillis(),
+ (short) 3));
+ Thread.sleep(500L);
+ queue.processAll(1000L);
+
+ //THEN
+ Assert.assertEquals(2, trackDeleteEvents.size());
+ Assert.assertEquals(2, copyEvents.size());
+
+ } finally {
+ if (leaseManager != null) {
+ leaseManager.shutdown();
+ }
+ }
+ }
+
+ @Test
public void testEventSending() throws InterruptedException, IOException {
//GIVEN
@@ -196,6 +252,7 @@ public class TestReplicationManager {
containerManager, queue, rapidLeaseManager);
try {
+ leaseManager.start();
rapidLeaseManager.start();
replicationManager.start();
@@ -223,25 +280,11 @@ public class TestReplicationManager {
Assert.assertEquals(2, copyEvents.size());
} finally {
- if (rapidLeaseManager != null) {
- rapidLeaseManager.shutdown();
+ rapidLeaseManager.shutdown();
+ if (leaseManager != null) {
+ leaseManager.shutdown();
}
}
}
- public static Pipeline createPipeline(Iterable<DatanodeDetails> ids)
- throws IOException {
- Objects.requireNonNull(ids, "ids == null");
- Preconditions.checkArgument(ids.iterator().hasNext());
- List<DatanodeDetails> dns = new ArrayList<>();
- ids.forEach(dns::add);
- return Pipeline.newBuilder()
- .setState(Pipeline.PipelineState.OPEN)
- .setId(PipelineID.randomId())
- .setType(HddsProtos.ReplicationType.STAND_ALONE)
- .setFactor(ReplicationFactor.ONE)
- .setNodes(dns)
- .build();
- }
-
}
\ No newline at end of file
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
index cd0b060..d58466f 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/ozoneimpl/TestOzoneContainer.java
@@ -75,13 +75,12 @@ public class TestOzoneContainer {
conf.setBoolean(
OzoneConfigKeys.DFS_CONTAINER_IPC_RANDOM_PORT, false);
- DatanodeDetails datanodeDetails = Mockito.mock(DatanodeDetails.class);
+ DatanodeDetails datanodeDetails = TestUtils.randomDatanodeDetails();
StateContext context = Mockito.mock(StateContext.class);
DatanodeStateMachine dsm = Mockito.mock(DatanodeStateMachine.class);
Mockito.when(dsm.getDatanodeDetails()).thenReturn(datanodeDetails);
Mockito.when(context.getParent()).thenReturn(dsm);
- container = new OzoneContainer(TestUtils.randomDatanodeDetails(),
- conf, context);
+ container = new OzoneContainer(datanodeDetails, conf, context);
//Setting scmId, as we start manually ozone container.
container.getDispatcher().setScmId(UUID.randomUUID().toString());
container.start();
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]