sumitagrawl commented on code in PR #4988:
URL: https://github.com/apache/ozone/pull/4988#discussion_r1259979240


##########
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/report/CommandStatusReportPublisher.java:
##########
@@ -78,9 +78,9 @@ protected CommandStatusReportsProto getReport() {
       // If status is still pending then don't remove it from map as
       // CommandHandler will change its status when it works on this command.
       if (!cmdStatus.getStatus().equals(Status.PENDING)) {
-        builder.addCmdStatus(cmdStatus.getProtoBufMessage());

Review Comment:
   what is the impact for this change at DN / SMC side?



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java:
##########
@@ -515,6 +516,17 @@ public List<SCMCommand> processHeartbeat(DatanodeDetails 
datanodeDetails,
           commandQueue.getDatanodeCommandSummary(datanodeDetails.getUuid());
       List<SCMCommand> commands =
           commandQueue.getCommand(datanodeDetails.getUuid());
+
+      // Update the SCMCommand of deleteBlocksCommand Status
+      for (SCMCommand command : commands) {

Review Comment:
   IMO, can have register to notify send, so that if required this notification 
for other command, can be easily added.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =

Review Comment:
   For handling deleted block status,
   - DeletedBlockLogStateManager is already there, keeping another class for 
same purpose may not be required.
   - Already command status is managed using transactionToRetryCountMap, 
transactionToDNsCommitMap
   IMO, we need combine these logic to provide single status manager for 
deleted blocks.



##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMDeleteBlocksCommandStatusManager.java:
##########
@@ -0,0 +1,375 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.hadoop.hdds.scm.block;
+
+import com.google.common.annotations.VisibleForTesting;
+import 
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.CommandStatus;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+import java.util.UUID;
+import java.util.concurrent.ConcurrentHashMap;
+
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.EXECUTED;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.NEED_RESEND;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.PENDING_EXECUTED;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.SENT;
+import static 
org.apache.hadoop.hdds.scm.block.SCMDeleteBlocksCommandStatusManager.CmdStatus.TO_BE_SENT;
+
+/**
+ * SCM DeleteBlocksCommand manager.
+ */
+public class SCMDeleteBlocksCommandStatusManager {
+  public static final Logger LOG =
+      LoggerFactory.getLogger(SCMDeleteBlocksCommandStatusManager.class);
+
+  /**
+   * Status of SCMDeleteBlocksCommand.
+   */
+  public enum CmdStatus {
+    // The DeleteBlocksCommand has not yet been sent.
+    // This is the initial status of the command after it's created.
+    TO_BE_SENT,
+    // This status indicates that the DeleteBlocksCommand has been sent
+    // to the DataNode, but the Datanode has not reported any new status
+    // for the DeleteBlocksCommand.
+    SENT,
+    // The DeleteBlocksCommand has been received by Datanode and
+    // is waiting for executed.
+    PENDING_EXECUTED,
+    // The DeleteBlocksCommand was executed, and the execution was successful
+    EXECUTED,
+    // The DeleteBlocksCommand was executed but failed to execute,
+    // or was lost before it was executed.
+    NEED_RESEND
+  }
+
+  private static final CmdStatus DEFAULT_STATUS = TO_BE_SENT;
+
+  private final Set<CmdStatus> statusesRequiringTimeout = new HashSet<>(
+      Arrays.asList(SENT, PENDING_EXECUTED));
+  private final Set<CmdStatus> finialStatuses = new HashSet<>(
+      Arrays.asList(EXECUTED, NEED_RESEND));
+  private final Set<CmdStatus> failedStatuses = new HashSet<>(
+      Arrays.asList(NEED_RESEND));
+
+  private final Map<UUID, Map<Long, CmdStatusData>> scmCmdStatusRecord;
+
+  public SCMDeleteBlocksCommandStatusManager() {
+    this.scmCmdStatusRecord = new ConcurrentHashMap<>();
+  }
+
+  public static CmdStatusData createScmCmdStatusData(
+      UUID dnId, long scmCmdId, Set<Long> deletedBlocksTxIds) {
+    return new CmdStatusData(dnId, scmCmdId, deletedBlocksTxIds);
+  }
+
+  protected static final class CmdStatusData {
+    private final UUID dnId;
+    private final long scmCmdId;
+    private final Set<Long> deletedBlocksTxIds;
+    private Instant updateTime;
+    private CmdStatus status;
+
+    private CmdStatusData(
+        UUID dnId, long scmTxID, Set<Long> deletedBlocksTxIds) {
+      this.dnId = dnId;
+      this.scmCmdId = scmTxID;
+      this.deletedBlocksTxIds = deletedBlocksTxIds;
+      setStatus(DEFAULT_STATUS);
+    }
+
+    public boolean isContainDeletedBlocksTx(long deletedBlocksTxId) {
+      return deletedBlocksTxIds.contains(deletedBlocksTxId);
+    }
+
+    public Set<Long> getDeletedBlocksTxIds() {
+      return deletedBlocksTxIds;
+    }
+
+    public UUID getDnId() {
+      return dnId;
+    }
+
+    public long getScmCmdId() {
+      return scmCmdId;
+    }
+
+    public CmdStatus getStatus() {
+      return status;
+    }
+
+    public void setStatus(CmdStatus status) {
+      this.updateTime = Instant.now();
+      this.status = status;
+    }
+
+    public Instant getUpdateTime() {
+      return updateTime;
+    }
+
+    @Override
+    public String toString() {
+      return "ScmTxStateMachine" +
+          "{dnId=" + dnId +
+          ", scmTxID=" + scmCmdId +
+          ", deletedBlocksTxIds=" + deletedBlocksTxIds +
+          ", updateTime=" + updateTime +
+          ", status=" + status +
+          '}';
+    }
+  }
+
+  public void recordScmCommand(CmdStatusData statusData) {
+    LOG.debug("Record ScmCommand: {}", statusData);
+    scmCmdStatusRecord.computeIfAbsent(statusData.getDnId(), k ->
+        new ConcurrentHashMap<>()).put(statusData.getScmCmdId(), statusData);
+  }
+
+  public Map<UUID, Map<Long, CmdStatus>> getCommandStatusByTxId(
+      Set<UUID> dnIds) {
+    Map<UUID, Map<Long, CmdStatus>> result =
+        new HashMap<>(scmCmdStatusRecord.size());
+
+    for (UUID dnId : dnIds) {
+      if (scmCmdStatusRecord.get(dnId) == null) {
+        continue;
+      }
+      Map<Long, CmdStatus> dnStatusMap = new HashMap<>();
+      Map<Long, CmdStatusData> record = scmCmdStatusRecord.get(dnId);
+      for (CmdStatusData statusData : record.values()) {
+        CmdStatus status = statusData.getStatus();
+        for (Long deletedBlocksTxId : statusData.getDeletedBlocksTxIds()) {
+          dnStatusMap.put(deletedBlocksTxId, status);
+        }
+      }
+      result.put(dnId, dnStatusMap);
+    }
+
+    return result;
+  }
+
+  public void onSent(UUID dnId, long scmCmdId) {
+    updateStatus(dnId, scmCmdId, SENT);
+  }
+
+  public void updateStatusByDNCommandStatus(UUID dnId, long scmCmdId,
+      CommandStatus.Status newState) {
+    CmdStatus status = fromProtoCommandStatus(newState);
+    if (status != null) {
+      updateStatus(dnId, scmCmdId, status);
+    }
+  }
+
+  public void cleanAllTimeoutSCMCommand(long timeoutMs) {
+    for (UUID dnId : scmCmdStatusRecord.keySet()) {

Review Comment:
   For deadNode, need cleanup this cache for the DN, need require trigger 
during deadNodeHandler



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to