swamirishi commented on code in PR #3984:
URL: https://github.com/apache/ozone/pull/3984#discussion_r1029796568


##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java:
##########
@@ -158,96 +169,63 @@ public Map<DatanodeDetails, SCMCommand<?>> 
processAndCreateCommands(
     final ContainerID id = container.containerID();
     final Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>();
     try {
-     // State is UNDER_REPLICATED
       final List<DatanodeDetails> deletionInFlight = new ArrayList<>();
       for (ContainerReplicaOp op : pendingOps) {
         if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) {
           deletionInFlight.add(op.getTarget());
         }
       }
-      List<Integer> missingIndexes = replicaCount.unavailableIndexes(true);
-      Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
-              filterSources(replicas, deletionInFlight);
-      List<DatanodeDetails> nodes =
-              sources.values().stream().map(Pair::getLeft)
-                      .map(ContainerReplica::getDatanodeDetails)
-                      .filter(datanodeDetails ->
-                              datanodeDetails.getPersistedOpState() ==
-                              HddsProtos.NodeOperationalState.IN_SERVICE)
-                      .collect(Collectors.toList());
-      // We got the missing indexes, this is excluded any decommissioning
-      // indexes. Find the good source nodes.
-      if (missingIndexes.size() > 0) {
 
-        LOG.debug("Missing indexes detected for the container {}." +
-                " The missing indexes are {}", id, missingIndexes);
-        // We have source nodes.
-        if (sources.size() >= repConfig.getData()) {
-          final List<DatanodeDetails> selectedDatanodes = getTargetDatanodes(
-              excludedNodes, container, missingIndexes.size());
-
-          if (validatePlacement(nodes, selectedDatanodes)) {
-            excludedNodes.addAll(selectedDatanodes);
-            nodes.addAll(selectedDatanodes);
-            List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex>
-                    sourceDatanodesWithIndex = new ArrayList<>();
-            for (Pair<ContainerReplica, NodeStatus> src : sources.values()) {
-              sourceDatanodesWithIndex.add(
-                      new ReconstructECContainersCommand
-                              .DatanodeDetailsAndReplicaIndex(
-                              src.getLeft().getDatanodeDetails(),
-                              src.getLeft().getReplicaIndex()));
-            }
+      Map<Integer, Pair<ContainerReplica, NodeStatus>> sources =
+          filterSources(replicas, deletionInFlight);
+      List<DatanodeDetails> availableSourceNodes =
+          sources.values().stream().map(Pair::getLeft)
+              .map(ContainerReplica::getDatanodeDetails)
+              .filter(datanodeDetails ->
+                  datanodeDetails.getPersistedOpState() ==
+                      HddsProtos.NodeOperationalState.IN_SERVICE)
+              .collect(Collectors.toList());
 
-            final ReconstructECContainersCommand reconstructionCommand =
-                    new 
ReconstructECContainersCommand(id.getProtobuf().getId(),
-                            sourceDatanodesWithIndex, selectedDatanodes,
-                            int2byte(missingIndexes),
-                            repConfig);
-            // Keeping the first target node as coordinator.
-            commands.put(selectedDatanodes.get(0), reconstructionCommand);
-          }
-        } else {
-          LOG.warn("Cannot proceed for EC container reconstruction for {}, due"
-              + " to insufficient source replicas found. Number of source "
-              + "replicas needed: {}. Number of available source replicas are:"
-              + " {}. Available sources are: {}", container.containerID(),
-              repConfig.getData(), sources.size(), sources);
+      try {
+        processMissingIndexes(replicaCount, sources, availableSourceNodes,
+            excludedNodes, commands);
+        processDecommissioningIndexes(replicaCount, replicas,
+            availableSourceNodes, excludedNodes, commands);
+        processMaintenanceOnlyIndexes(replicaCount, replicas, excludedNodes,
+            commands);
+        // TODO - we should be able to catch SCMException here and check the
+        //        result code but the RackAware topology never sets the code.
+      } catch (CannotFindTargetsException e) {
+        // If we get here, we tried to find nodes to fix the under replication
+        // issues, but were not able to find any at some stage, and the
+        // placement policy threw an exception.
+        // At this stage. If the cluster is small and there are some
+        // over replicated indexes, it could stop us finding a new node as 
there
+        // are no more nodes left to try.
+        // If the container is also over replicated, then hand it off to the
+        // over-rep handler, and after those over-rep indexes are cleared the
+        // under replication can be re-tried in the next iteration of RM.
+        // However, we should only hand off to the over rep handler if there 
are
+        // no commands already created. If we have some commands, they may
+        // attempt to use sources the over-rep handler would remove. So we
+        // should let the commands we have created be processed, and then the
+        // container will be re-processed in a further RM pass.
+        LOG.debug("Unable to located new target nodes for container {}",
+            container, e);
+        if (commands.size() > 0) {
+          LOG.debug("Some commands have already been created, so returning " +
+              "with them only");
+          return commands;
         }
-      }
-      Set<Integer> decomIndexes = 
replicaCount.decommissioningOnlyIndexes(true);
-      if (decomIndexes.size() > 0) {
-        final List<DatanodeDetails> selectedDatanodes =
-            getTargetDatanodes(excludedNodes, container, decomIndexes.size());
-        if (validatePlacement(nodes, selectedDatanodes)) {
-          excludedNodes.addAll(selectedDatanodes);
-          Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator();
-          // In this case we need to do one to one copy.
-          for (ContainerReplica replica : replicas) {
-            if (decomIndexes.contains(replica.getReplicaIndex())) {
-              if (!iterator.hasNext()) {
-                LOG.warn("Couldn't find enough targets. Available source"
-                    + " nodes: {}, the target nodes: {}, excluded nodes: {} 
and"
-                    + "  the decommission indexes: {}",
-                    replicas, selectedDatanodes, excludedNodes, decomIndexes);
-                break;
-              }
-              DatanodeDetails decommissioningSrcNode
-                  = replica.getDatanodeDetails();
-              final ReplicateContainerCommand replicateCommand =
-                  new ReplicateContainerCommand(id.getProtobuf().getId(),
-                      ImmutableList.of(decommissioningSrcNode));
-              // For EC containers, we need to track the replica index which is
-              // to be replicated, so add it to the command.
-              replicateCommand.setReplicaIndex(replica.getReplicaIndex());
-              DatanodeDetails target = iterator.next();
-              commands.put(target, replicateCommand);
-            }
-          }
+        if (replicaCount.isOverReplicated()) {

Review Comment:
   Wouldn't that be better then only one thread will access overreplication 
handler. Otherwise underreplication is calling over replication.



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: [email protected]

For queries about this service, please contact Infrastructure at:
[email protected]


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to