This is an automated email from the ASF dual-hosted git repository.
adoroszlai pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 75bbba30a8 HDDS-12552. Fix raw use of generic class SCMCommand (#8048)
75bbba30a8 is described below
commit 75bbba30a8d5aa9277a342a9d10d0aac031c830e
Author: Nandakumar Vadivelu <[email protected]>
AuthorDate: Wed Mar 12 22:05:25 2025 +0530
HDDS-12552. Fix raw use of generic class SCMCommand (#8048)
---
.../common/statemachine/StateContext.java | 10 +++---
.../CloseContainerCommandHandler.java | 2 +-
.../ClosePipelineCommandHandler.java | 2 +-
.../commandhandler/CommandDispatcher.java | 2 +-
.../commandhandler/CommandHandler.java | 4 +--
.../CreatePipelineCommandHandler.java | 2 +-
.../commandhandler/DeleteBlocksCommandHandler.java | 2 +-
.../DeleteContainerCommandHandler.java | 4 +--
.../FinalizeNewLayoutVersionCommandHandler.java | 2 +-
.../ReconstructECContainersCommandHandler.java | 2 +-
.../RefreshVolumeUsageCommandHandler.java | 2 +-
.../ReplicateContainerCommandHandler.java | 2 +-
.../SetNodeOperationalStateCommandHandler.java | 2 +-
.../protocol/StorageContainerNodeProtocol.java | 4 +--
.../replication/ContainerHealthResult.java | 6 ++--
.../apache/hadoop/hdds/scm/node/CommandQueue.java | 38 ++++------------------
.../hadoop/hdds/scm/node/DeadNodeHandler.java | 2 +-
.../apache/hadoop/hdds/scm/node/NodeManager.java | 4 +--
.../hadoop/hdds/scm/node/SCMNodeManager.java | 10 +++---
.../scm/server/SCMDatanodeHeartbeatDispatcher.java | 4 +--
.../hdds/scm/server/SCMDatanodeProtocolServer.java | 4 +--
.../hadoop/hdds/scm/container/MockNodeManager.java | 14 ++++----
.../hdds/scm/container/SimpleMockNodeManager.java | 6 ++--
.../replication/TestReplicationManager.java | 2 +-
.../hadoop/hdds/scm/node/TestCommandQueue.java | 2 +-
.../hadoop/hdds/scm/node/TestSCMNodeManager.java | 4 +--
.../testutils/ReplicationNodeManagerMock.java | 6 ++--
.../hdds/scm/TestStorageContainerManager.java | 4 +--
.../hadoop/ozone/recon/scm/ReconNodeManager.java | 4 +--
.../ozone/recon/scm/TestReconNodeManager.java | 4 +--
30 files changed, 66 insertions(+), 90 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
index bc995854a8..91cfaa5a21 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/StateContext.java
@@ -102,7 +102,7 @@ public class StateContext {
static final Logger LOG =
LoggerFactory.getLogger(StateContext.class);
- private final Queue<SCMCommand> commandQueue;
+ private final Queue<SCMCommand<?>> commandQueue;
private final Map<Long, CommandStatus> cmdStatusMap;
private final Lock lock;
private final DatanodeStateMachine parentDatanodeStateMachine;
@@ -738,7 +738,7 @@ public OptionalLong getTermOfLeaderSCM() {
*
* @return SCMCommand or Null.
*/
- public SCMCommand getNextCommand() {
+ public SCMCommand<?> getNextCommand() {
lock.lock();
try {
initTermOfLeaderSCM();
@@ -772,7 +772,7 @@ public SCMCommand getNextCommand() {
*
* @param command - SCMCommand.
*/
- public void addCommand(SCMCommand command) {
+ public void addCommand(SCMCommand<?> command) {
lock.lock();
try {
if (commandQueue.size() >= maxCommandQueueLimit) {
@@ -792,7 +792,7 @@ public Map<SCMCommandProto.Type, Integer>
getCommandQueueSummary() {
Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();
lock.lock();
try {
- for (SCMCommand cmd : commandQueue) {
+ for (SCMCommand<?> cmd : commandQueue) {
summary.put(cmd.getType(), summary.getOrDefault(cmd.getType(), 0) + 1);
}
} finally {
@@ -832,7 +832,7 @@ public void addCmdStatus(Long key, CommandStatus status) {
*
* @param cmd - {@link SCMCommand}.
*/
- public void addCmdStatus(SCMCommand cmd) {
+ public void addCmdStatus(SCMCommand<?> cmd) {
if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
addCmdStatus(cmd.getId(),
DeleteBlockCommandStatusBuilder.newBuilder()
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
index 700303ee0c..1a1594cf8a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CloseContainerCommandHandler.java
@@ -84,7 +84,7 @@ public CloseContainerCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
index 8fcd192fe5..5cbe472689 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ClosePipelineCommandHandler.java
@@ -93,7 +93,7 @@ public ClosePipelineCommandHandler(
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
index 69a40e1f1a..696b04defe 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandDispatcher.java
@@ -85,7 +85,7 @@ public CommandHandler getDeleteBlocksCommandHandler() {
*
* @param command - SCM Command.
*/
- public void handle(SCMCommand command) {
+ public void handle(SCMCommand<?> command) {
Preconditions.checkNotNull(command);
CommandHandler handler = handlerMap.get(command.getType());
if (handler != null) {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
index 68ab8087d6..d516977838 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CommandHandler.java
@@ -38,7 +38,7 @@ public interface CommandHandler {
* @param context - Current Context.
* @param connectionManager - The SCMs that we are talking to.
*/
- void handle(SCMCommand command, OzoneContainer container,
+ void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager);
/**
@@ -68,7 +68,7 @@ void handle(SCMCommand command, OzoneContainer container,
/**
* Default implementation for updating command status.
*/
- default void updateCommandStatus(StateContext context, SCMCommand command,
+ default void updateCommandStatus(StateContext context, SCMCommand<?> command,
Consumer<CommandStatus> cmdStatusUpdater, Logger log) {
if (!context.updateCommandStatus(command.getId(), cmdStatusUpdater)) {
log.warn("{} with Id:{} not found.", command.getType(),
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
index d86c028751..30ffe7ed41 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/CreatePipelineCommandHandler.java
@@ -90,7 +90,7 @@ public CreatePipelineCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
queuedCount.incrementAndGet();
CompletableFuture.runAsync(() -> {
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
index 80c078c508..71277c0637 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteBlocksCommandHandler.java
@@ -123,7 +123,7 @@ public DeleteBlocksCommandHandler(OzoneContainer container,
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
if (command.getType() != SCMCommandProto.Type.deleteBlocksCommand) {
LOG.warn("Skipping handling command, expected command "
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
index 1d23da794a..ae036a1c8f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/DeleteContainerCommandHandler.java
@@ -76,7 +76,7 @@ protected DeleteContainerCommandHandler(Clock clock,
this.opsLatencyMs =
registry.newRate(SCMCommandProto.Type.deleteContainerCommand + "Ms");
}
@Override
- public void handle(final SCMCommand command,
+ public void handle(final SCMCommand<?> command,
final OzoneContainer ozoneContainer,
final StateContext context,
final SCMConnectionManager connectionManager) {
@@ -93,7 +93,7 @@ public void handle(final SCMCommand command,
}
}
- private void handleInternal(SCMCommand command, StateContext context,
+ private void handleInternal(SCMCommand<?> command, StateContext context,
DeleteContainerCommand deleteContainerCommand,
ContainerController controller) {
final long startTime = Time.monotonicNow();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
index 6e1c566343..a27b94b76a 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/FinalizeNewLayoutVersionCommandHandler.java
@@ -63,7 +63,7 @@ public FinalizeNewLayoutVersionCommandHandler() {
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer ozoneContainer,
+ public void handle(SCMCommand<?> command, OzoneContainer ozoneContainer,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("Processing FinalizeNewLayoutVersionCommandHandler command.");
invocationCount.incrementAndGet();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
index 4366a91218..b2159aa44f 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReconstructECContainersCommandHandler.java
@@ -47,7 +47,7 @@ public
ReconstructECContainersCommandHandler(ConfigurationSource conf,
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
ReconstructECContainersCommand ecContainersCommand =
(ReconstructECContainersCommand) command;
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
index bc8b69a50a..f26329792b 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/RefreshVolumeUsageCommandHandler.java
@@ -48,7 +48,7 @@ public RefreshVolumeUsageCommandHandler() {
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
LOG.info("receive command to refresh usage info of all volumes");
invocationCount.incrementAndGet();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
index d52c51e298..17bb10fc7e 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/ReplicateContainerCommandHandler.java
@@ -65,7 +65,7 @@ public String getMetricsName() {
}
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
final ReplicateContainerCommand replicateCommand =
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
index 548a549174..25a158bb45 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/SetNodeOperationalStateCommandHandler.java
@@ -76,7 +76,7 @@ public
SetNodeOperationalStateCommandHandler(ConfigurationSource conf,
* @param connectionManager - The SCMs that we are talking to.
*/
@Override
- public void handle(SCMCommand command, OzoneContainer container,
+ public void handle(SCMCommand<?> command, OzoneContainer container,
StateContext context, SCMConnectionManager connectionManager) {
long startTime = Time.monotonicNow();
invocationCount.incrementAndGet();
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
index 2d0ed82d90..ffe5a40fb4 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/StorageContainerNodeProtocol.java
@@ -69,7 +69,7 @@ RegisteredCommand register(DatanodeDetails datanodeDetails,
* @param datanodeDetails - Datanode ID.
* @return Commands to be sent to the datanode.
*/
- default List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails) {
+ default List<SCMCommand<?>> processHeartbeat(DatanodeDetails
datanodeDetails) {
return processHeartbeat(datanodeDetails, null);
};
@@ -80,7 +80,7 @@ default List<SCMCommand> processHeartbeat(DatanodeDetails
datanodeDetails) {
* heartbeating datanode.
* @return Commands to be sent to the datanode.
*/
- List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+ List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto queueReport);
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
index bbcf498ec5..bf4f0b92fd 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerHealthResult.java
@@ -40,7 +40,7 @@ public enum HealthState {
private final ContainerInfo containerInfo;
private final HealthState healthState;
- private final List<SCMCommand> commands = new ArrayList<>();
+ private final List<SCMCommand<?>> commands = new ArrayList<>();
public ContainerHealthResult(ContainerInfo containerInfo,
HealthState healthState) {
@@ -52,11 +52,11 @@ public HealthState getHealthState() {
return healthState;
}
- public void addCommand(SCMCommand command) {
+ public void addCommand(SCMCommand<?> command) {
commands.add(command);
}
- public List<SCMCommand> getCommands() {
+ public List<SCMCommand<?>> getCommands() {
return commands;
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
index 568328210c..f122215105 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/CommandQueue.java
@@ -27,7 +27,6 @@
import java.util.UUID;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
-import org.apache.hadoop.util.Time;
/**
* Command Queue is queue of commands for the datanode.
@@ -52,8 +51,6 @@ public long getCommandsInQueue() {
/**
* Constructs a Command Queue.
- * TODO : Add a flusher thread that throws away commands older than a certain
- * time period.
*/
public CommandQueue() {
commandMap = new HashMap<>();
@@ -78,9 +75,9 @@ public void clear() {
* @return List of SCM Commands.
*/
@SuppressWarnings("unchecked")
- List<SCMCommand> getCommand(final UUID datanodeUuid) {
+ List<SCMCommand<?>> getCommand(final UUID datanodeUuid) {
Commands cmds = commandMap.remove(datanodeUuid);
- List<SCMCommand> cmdList = null;
+ List<SCMCommand<?>> cmdList = null;
if (cmds != null) {
cmdList = cmds.getCommands();
commandsInQueue -= !cmdList.isEmpty() ? cmdList.size() : 0;
@@ -134,8 +131,7 @@ public Map<SCMCommandProto.Type, Integer>
getDatanodeCommandSummary(
* @param datanodeUuid DatanodeDetails.Uuid
* @param command - Command
*/
- public void addCommand(final UUID datanodeUuid, final SCMCommand
- command) {
+ public void addCommand(final UUID datanodeUuid, final SCMCommand<?> command)
{
commandMap.computeIfAbsent(datanodeUuid, s -> new Commands()).add(command);
commandsInQueue++;
}
@@ -144,39 +140,20 @@ public void addCommand(final UUID datanodeUuid, final
SCMCommand
* Class that stores commands for a datanode.
*/
private static class Commands {
- private long updateTime = 0;
- private long readTime = 0;
- private List<SCMCommand> commands = new ArrayList<>();
+ private List<SCMCommand<?>> commands = new ArrayList<>();
private final Map<SCMCommandProto.Type, Integer> summary = new HashMap<>();
- /**
- * Gets the last time the commands for this node was updated.
- * @return Time stamp
- */
- public long getUpdateTime() {
- return updateTime;
- }
-
- /**
- * Gets the last read time.
- * @return last time when these commands were read from this queue.
- */
- public long getReadTime() {
- return readTime;
- }
-
/**
* Adds a command to the list.
*
* @param command SCMCommand
*/
- public void add(SCMCommand command) {
+ public void add(SCMCommand<?> command) {
this.commands.add(command);
if (command.contributesToQueueSize()) {
summary.put(command.getType(),
summary.getOrDefault(command.getType(), 0) + 1);
}
- updateTime = Time.monotonicNow();
}
public int getCommandSummary(SCMCommandProto.Type commandType) {
@@ -191,11 +168,10 @@ public Map<SCMCommandProto.Type, Integer>
getAllCommandsSummary() {
* Returns the commands for this datanode.
* @return command list.
*/
- public List<SCMCommand> getCommands() {
- List<SCMCommand> temp = this.commands;
+ public List<SCMCommand<?>> getCommands() {
+ List<SCMCommand<?>> temp = this.commands;
this.commands = new ArrayList<>();
summary.clear();
- readTime = Time.monotonicNow();
return temp;
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
index 20dc5aea78..f582623b8c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/DeadNodeHandler.java
@@ -98,7 +98,7 @@ public void onMessage(final DatanodeDetails datanodeDetails,
}
// remove commands in command queue for the DN
- final List<SCMCommand> cmdList = nodeManager.getCommandQueue(
+ final List<SCMCommand<?>> cmdList = nodeManager.getCommandQueue(
datanodeDetails.getUuid());
LOG.info("Clearing command queue of size {} for DN {}",
cmdList.size(), datanodeDetails);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
index e6a74b395f..275665ec38 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NodeManager.java
@@ -282,7 +282,7 @@ Set<ContainerID> getContainers(DatanodeDetails
datanodeDetails)
* @param dnId datanode uuid
* @param command
*/
- void addDatanodeCommand(UUID dnId, SCMCommand command);
+ void addDatanodeCommand(UUID dnId, SCMCommand<?> command);
/**
@@ -368,7 +368,7 @@ Map<SCMCommandProto.Type, Integer>
getTotalDatanodeCommandCounts(
* @return list of commands
*/
// TODO: We can give better name to this method!
- List<SCMCommand> getCommandQueue(UUID dnID);
+ List<SCMCommand<?>> getCommandQueue(UUID dnID);
/**
* Given datanode uuid, returns the DatanodeDetails for the node.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 43d13e4ae6..ee6ad2b338 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -533,7 +533,7 @@ private boolean isVersionChange(String oldVersion, String
newVersion) {
* @return SCMheartbeat response.
*/
@Override
- public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto
queueReport) {
Preconditions.checkNotNull(datanodeDetails, "Heartbeat is missing " +
"DatanodeDetails.");
@@ -550,7 +550,7 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails
datanodeDetails,
try {
Map<SCMCommandProto.Type, Integer> summary =
commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
- List<SCMCommand> commands =
+ List<SCMCommand<?>> commands =
commandQueue.getCommand(datanodeDetails.getUuid());
// Update the SCMCommand of deleteBlocksCommand Status
@@ -1635,7 +1635,7 @@ public int getPipeLineCount(DatanodeDetails
datanodeDetails)
}
@Override
- public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
writeLock().lock();
try {
this.commandQueue.addCommand(dnId, command);
@@ -1678,7 +1678,7 @@ public void onMessage(CommandForDatanode
commandForDatanode,
}
@Override
- public List<SCMCommand> getCommandQueue(UUID dnID) {
+ public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
// Getting the queue actually clears it and returns the commands, so this
// is a write operation and not a read as the method name suggests.
writeLock().lock();
@@ -1846,7 +1846,7 @@ public void removeNode(DatanodeDetails datanodeDetails)
throws NodeNotFoundExcep
}
nodeStateManager.removeNode(datanodeDetails);
removeFromDnsToUuidMap(datanodeDetails.getUuid(),
datanodeDetails.getIpAddress());
- final List<SCMCommand> cmdList =
getCommandQueue(datanodeDetails.getUuid());
+ final List<SCMCommand<?>> cmdList =
getCommandQueue(datanodeDetails.getUuid());
LOG.info("Clearing command queue of size {} for DN {}",
cmdList.size(), datanodeDetails);
} else {
LOG.warn("Node not decommissioned or dead, cannot remove: {}",
datanodeDetails);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
index ddc87da038..58d2a8164e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java
@@ -82,10 +82,10 @@ public SCMDatanodeHeartbeatDispatcher(NodeManager
nodeManager,
*
* @return list of SCMCommand
*/
- public List<SCMCommand> dispatch(SCMHeartbeatRequestProto heartbeat) {
+ public List<SCMCommand<?>> dispatch(SCMHeartbeatRequestProto heartbeat) {
DatanodeDetails datanodeDetails =
DatanodeDetails.getFromProtoBuf(heartbeat.getDatanodeDetails());
- List<SCMCommand> commands;
+ List<SCMCommand<?>> commands;
// If node is not registered, ask the node to re-register. Do not process
// Heartbeat for unregistered nodes.
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
index 4a549afa77..50b6d25f07 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeProtocolServer.java
@@ -302,7 +302,7 @@ private String
constructCommandAuditMap(List<SCMCommandProto> cmds) {
public SCMHeartbeatResponseProto sendHeartbeat(
SCMHeartbeatRequestProto heartbeat) throws IOException, TimeoutException
{
List<SCMCommandProto> cmdResponses = new ArrayList<>();
- for (SCMCommand cmd : heartbeatDispatcher.dispatch(heartbeat)) {
+ for (SCMCommand<?> cmd : heartbeatDispatcher.dispatch(heartbeat)) {
cmdResponses.add(getCommandResponse(cmd, scm));
}
final OptionalLong term = getTermIfLeader();
@@ -352,7 +352,7 @@ private OptionalLong getTermIfLeader() {
* @throws IOException
*/
@VisibleForTesting
- public static SCMCommandProto getCommandResponse(SCMCommand cmd,
+ public static SCMCommandProto getCommandResponse(SCMCommand<?> cmd,
OzoneStorageContainerManager scm) throws IOException, TimeoutException {
SCMCommandProto.Builder builder = SCMCommandProto.newBuilder()
.setEncodedToken(cmd.getEncodedToken());
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
index 244f86e795..a0d39b0f01 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/MockNodeManager.java
@@ -102,7 +102,7 @@ public class MockNodeManager implements NodeManager {
private final List<DatanodeDetails> deadNodes;
private final Map<DatanodeDetails, SCMNodeStat> nodeMetricMap;
private final SCMNodeStat aggregateStat;
- private final Map<UUID, List<SCMCommand>> commandMap;
+ private final Map<UUID, List<SCMCommand<?>>> commandMap;
private Node2PipelineMap node2PipelineMap;
private final Node2ContainerMap node2ContainerMap;
private NetworkTopology clusterMap;
@@ -533,13 +533,13 @@ public void removeContainer(DatanodeDetails dd,
}
@Override
- public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
if (commandMap.containsKey(dnId)) {
- List<SCMCommand> commandList = commandMap.get(dnId);
+ List<SCMCommand<?>> commandList = commandMap.get(dnId);
Preconditions.checkNotNull(commandList);
commandList.add(command);
} else {
- List<SCMCommand> commandList = new LinkedList<>();
+ List<SCMCommand<?>> commandList = new LinkedList<>();
commandList.add(command);
commandMap.put(dnId, commandList);
}
@@ -656,7 +656,7 @@ public Set<ContainerID> getContainers(DatanodeDetails uuid)
{
// Returns the number of commands that is queued to this node manager.
public int getCommandCount(DatanodeDetails dd) {
- List<SCMCommand> list = commandMap.get(dd.getUuid());
+ List<SCMCommand<?>> list = commandMap.get(dd.getUuid());
return (list == null) ? 0 : list.size();
}
@@ -760,7 +760,7 @@ private synchronized void addEntryTodnsToUuidMap(
* @return SCMheartbeat response list
*/
@Override
- public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandQueueReportProto) {
return null;
}
@@ -847,7 +847,7 @@ public void onMessage(CommandForDatanode commandForDatanode,
}
@Override
- public List<SCMCommand> getCommandQueue(UUID dnID) {
+ public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
return null;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
index ea1054784d..085a282444 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/SimpleMockNodeManager.java
@@ -269,7 +269,7 @@ public void removeContainer(DatanodeDetails datanodeDetails,
}
@Override
- public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
}
/**
@@ -341,7 +341,7 @@ public Map<SCMCommandProto.Type, Integer>
getTotalDatanodeCommandCounts(
}
@Override
- public List<SCMCommand> getCommandQueue(UUID dnID) {
+ public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
return null;
}
@@ -426,7 +426,7 @@ public RegisteredCommand register(DatanodeDetails
datanodeDetails,
}
@Override
- public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto commandQueueReportProto) {
return null;
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
index 1b1e26959b..7555e1ab88 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/replication/TestReplicationManager.java
@@ -1311,7 +1311,7 @@ public void testSendLowPriorityReplicateContainerCommand()
replicationManager.sendLowPriorityReplicateContainerCommand(containerInfo,
0, src, target, scmDeadline);
- ArgumentCaptor<SCMCommand> command =
+ ArgumentCaptor<SCMCommand<?>> command =
ArgumentCaptor.forClass(SCMCommand.class);
ArgumentCaptor<UUID> targetUUID =
ArgumentCaptor.forClass(UUID.class);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
index a917bc0225..d47a37f263 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestCommandQueue.java
@@ -101,7 +101,7 @@ public void testSummaryUpdated() {
datanode2UUID, SCMCommandProto.Type.createPipelineCommand));
// Ensure the counts are cleared when the commands are retrieved
- List<SCMCommand> cmds = commandQueue.getCommand(datanode1UUID);
+ List<SCMCommand<?>> cmds = commandQueue.getCommand(datanode1UUID);
assertEquals(5, cmds.size());
assertEquals(0, commandQueue.getDatanodeCommandCount(
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
index 25802ddb81..cb2315f7fd 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java
@@ -644,7 +644,7 @@ public void testSetNodeOpStateAndCommandFired()
// If found mismatch, leader SCM fires a SetNodeOperationalStateCommand
// to update the opState persisted in Datanode.
scm.getScmContext().updateLeaderAndTerm(true, 1);
- List<SCMCommand> commands = nodeManager.processHeartbeat(dn);
+ List<SCMCommand<?>> commands = nodeManager.processHeartbeat(dn);
assertEquals(SetNodeOperationalStateCommand.class,
commands.get(0).getClass());
@@ -1763,7 +1763,7 @@ public void testHandlingSCMCommandEvent()
PipelineID.randomId())));
eq.processAll(1000L);
- List<SCMCommand> command =
+ List<SCMCommand<?>> command =
nodemanager.processHeartbeat(datanodeDetails);
// With dh registered, SCM will send create pipeline command to dn
assertThat(command.size()).isGreaterThanOrEqualTo(1);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
index 6eb7d7c943..48508891f6 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/testutils/ReplicationNodeManagerMock.java
@@ -372,7 +372,7 @@ public RegisteredCommand register(DatanodeDetails dd,
* @return SCMheartbeat response list
*/
@Override
- public List<SCMCommand> processHeartbeat(DatanodeDetails dd,
+ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails dd,
CommandQueueReportProto commandQueueReportProto) {
return null;
}
@@ -401,7 +401,7 @@ public void addNode(DatanodeDetails id, NodeStatus status) {
}
@Override
- public void addDatanodeCommand(UUID dnId, SCMCommand command) {
+ public void addDatanodeCommand(UUID dnId, SCMCommand<?> command) {
this.commandQueue.addCommand(dnId, command);
}
@@ -491,7 +491,7 @@ public void onMessage(CommandForDatanode commandForDatanode,
}
@Override
- public List<SCMCommand> getCommandQueue(UUID dnID) {
+ public List<SCMCommand<?>> getCommandQueue(UUID dnID) {
return null;
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
index 70e7fb5e5b..23c7bf3930 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/TestStorageContainerManager.java
@@ -454,10 +454,10 @@ public void testBlockDeletingThrottling() throws
Exception {
GenericTestUtils.waitFor(() -> {
NodeManager nodeManager = cluster.getStorageContainerManager()
.getScmNodeManager();
- List<SCMCommand> commands = nodeManager.processHeartbeat(
+ List<SCMCommand<?>> commands = nodeManager.processHeartbeat(
nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0));
if (commands != null) {
- for (SCMCommand cmd : commands) {
+ for (SCMCommand<?> cmd : commands) {
if (cmd.getType() == SCMCommandProto.Type.deleteBlocksCommand) {
List<DeletedBlocksTransaction> deletedTXs =
((DeleteBlocksCommand) cmd).blocksTobeDeleted();
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
index d9d1fd5b4c..fbbb58a124 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconNodeManager.java
@@ -222,9 +222,9 @@ public void onMessage(CommandForDatanode commandForDatanode,
* @return SCMheartbeat response.
*/
@Override
- public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails,
+ public List<SCMCommand<?>> processHeartbeat(DatanodeDetails datanodeDetails,
CommandQueueReportProto queueReport) {
- List<SCMCommand> cmds = new ArrayList<>();
+ List<SCMCommand<?>> cmds = new ArrayList<>();
long currentTime = Time.now();
if (needUpdate(datanodeDetails, currentTime)) {
cmds.add(new ReregisterCommand());
diff --git
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
index e484295feb..9fa7434076 100644
---
a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
+++
b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/scm/TestReconNodeManager.java
@@ -174,7 +174,7 @@ public void testReconNodeDB() throws IOException,
NodeNotFoundException {
.getOpStateExpiryEpochSeconds());
// Upon processing the heartbeat, the illegal command should be filtered
out
- List<SCMCommand> returnedCmds =
+ List<SCMCommand<?>> returnedCmds =
reconNodeManager.processHeartbeat(datanodeDetails);
assertEquals(1, returnedCmds.size());
assertEquals(SCMCommandProto.Type.reregisterCommand,
@@ -272,7 +272,7 @@ public void testDatanodeUpdate() throws IOException {
datanodeDetails.setHostName("hostname2");
// Upon processing the heartbeat, the illegal command should be filtered
out
- List<SCMCommand> returnedCmds =
+ List<SCMCommand<?>> returnedCmds =
reconNodeManager.processHeartbeat(datanodeDetails);
assertEquals(1, returnedCmds.size());
assertEquals(SCMCommandProto.Type.reregisterCommand,
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]