xichen01 commented on code in PR #4988: URL: https://github.com/apache/ozone/pull/4988#discussion_r1265442916
########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT; + +/** + * SCM DeleteBlocksCommand manager. + */ +public class SCMDeleteBlocksCommandStatusManager { + public static final Logger LOG = Review Comment: Done. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java: ########## @@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails datanodeDetails, commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid()); List<SCMCommand> commands = commandQueue.getCommand(datanodeDetails.getUuid()); + + // Update the SCMCommand of deleteBlocksCommand Status + for (SCMCommand command : commands) { Review Comment: Done. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java: ########## @@ -0,0 +1,375 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + * + */ + +package org.apache.hadoop.hdds.scm.block; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.util.Arrays; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; + +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT; +import static org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT; + +/** + * SCM DeleteBlocksCommand manager. + */ +public class SCMDeleteBlocksCommandStatusManager { + public static final Logger LOG = + LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class); + + /** + * Status of SCMDeleteBlocksCommand. + */ + public enum CmdStatus { + // The DeleteBlocksCommand has not yet been sent. + // This is the initial status of the command after it's created. + TO_BE_SENT, + // This status indicates that the DeleteBlocksCommand has been sent + // to the DataNode, but the Datanode has not reported any new status + // for the DeleteBlocksCommand. + SENT, + // The DeleteBlocksCommand has been received by Datanode and + // is waiting for executed. + PENDING_EXECUTED, + // The DeleteBlocksCommand was executed, and the execution was successful + EXECUTED, + // The DeleteBlocksCommand was executed but failed to execute, + // or was lost before it was executed. + NEED_RESEND + } + + private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT; + + private final Set<CmdStatus> statusesRequiringTimeout = new HashSet<>( + Arrays.asList(SENT, PENDING_EXECUTED)); + private final Set<CmdStatus> finialStatuses = new HashSet<>( + Arrays.asList(EXECUTED, NEED_RESEND)); + private final Set<CmdStatus> failedStatuses = new HashSet<>( + Arrays.asList(NEED_RESEND)); + + private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord; + + public SCMDeleteBlocksCommandStatusManager() { + this.scmCmdStatusRecord = new ConcurrentHashMap<>(); + } + + public static CmdStatusData createScmCmdStatusData( + UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) { + return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds); + } + + protected static final class CmdStatusData { + private final UUID dnId; + private final long scmCmdId; + private final Set<Long> deletedBlocksTxIds; + private Instant updateTime; + private CmdStatus status; + + private CmdStatusData( + UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) { + this.dnId = dnId; + this.scmCmdId = scmTxID; + this.deletedBlocksTxIds = deletedBlocksTxIds; + setStatus(DEFAULT_STATUS); + } + + public boolean isContainDeletedBlocksTx(long deletedBlocksTxId) { + return deletedBlocksTxIds.contains(deletedBlocksTxId); + } + + public Set<Long> getDeletedBlocksTxIds() { + return deletedBlocksTxIds; + } + + public UUID getDnId() { + return dnId; + } + + public long getScmCmdId() { + return scmCmdId; + } + + public CmdStatus getStatus() { + return status; + } + + public void setStatus(CmdStatus status) { + this.updateTime = Instant.now(); + this.status = status; + } + + public Instant getUpdateTime() { + return updateTime; + } + + @Override + public String toString() { + return "ScmTxStateMachine" + + "{dnId=" + dnId + + ", scmTxID=" + scmCmdId + + ", deletedBlocksTxIds=" + deletedBlocksTxIds + + ", updateTime=" + updateTime + + ", status=" + status + + '}'; + } + } + + public void recordScmCommand(CmdStatusData statusData) { + LOG.debug("Record ScmCommand: {}", statusData); + scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k -> + new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData); + } + + public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId( + Set<UUID> dnIds) { + Map<UUID, Map<Long, CmdStatus>> result = + new HashMap<>(scmCmdStatusRecord.size()); + + for (UUID dnId : dnIds) { + if (scmCmdStatusRecord.get(dnId) == null) { + continue; + } + Map<Long, CmdStatus> dnStatusMap = new HashMap<>(); + Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId); + for (CmdStatusData statusData : record.values()) { + CmdStatus status = statusData.getStatus(); + for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) { + dnStatusMap.put(deletedBlocksTxId, status); + } + } + result.put(dnId, dnStatusMap); + } + + return result; + } + + public void onSent(UUID dnId, long scmCmdId) { + updateStatus(dnId, scmCmdId, SENT); + } + + public void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId, + CommandStatus.Status newState) { + CmdStatus status = fromProtoCommandStatus(newState); + if (status != null) { + updateStatus(dnId, scmCmdId, status); + } + } + + public void cleanAllTimeoutSCMCommand(long timeoutMs) { + for (UUID dnId : scmCmdStatusRecord.keySet()) { Review Comment: Done. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
