sodonnel commented on code in PR #4055: URL: https://github.com/apache/ozone/pull/4055#discussion_r1047112692
########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/RatisOverReplicationHandler.java: ########## @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.LinkedHashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.UUID; +import java.util.stream.Collectors; + +/** + * This class handles Ratis containers that are over replicated. It should + * be used to obtain SCMCommands that can be sent to datanodes to solve + * over replication. + */ +public class RatisOverReplicationHandler + extends AbstractOverReplicationHandler { + public static final Logger LOG = + LoggerFactory.getLogger(RatisOverReplicationHandler.class); + + public RatisOverReplicationHandler(PlacementPolicy placementPolicy) { + super(placementPolicy); + } + + /** + * Identifies datanodes where the specified container's replicas can be + * deleted. Creates the SCMCommands to be sent to datanodes. + * + * @param replicas Set of container replicas. + * @param pendingOps Pending (in flight) replications or + * deletions for this + * container. + * @param result Health check result indicating over + * replication + * @param minHealthyForMaintenance Number of healthy replicas that must be + * available for a DN to enter maintenance + * @return Returns a map of Datanodes and SCMCommands that can be sent to + * delete replicas on those datanodes. + */ + @Override + public Map<DatanodeDetails, SCMCommand<?>> processAndCreateCommands( + Set<ContainerReplica> replicas, List<ContainerReplicaOp> pendingOps, + ContainerHealthResult result, int minHealthyForMaintenance) throws + IOException { + ContainerInfo containerInfo = result.getContainerInfo(); + LOG.debug("Handling container {}.", containerInfo); + + // count pending adds and deletes + int pendingAdd = 0, pendingDelete = 0; + for (ContainerReplicaOp op : pendingOps) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.ADD) { + pendingAdd++; + } else if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + pendingDelete++; + } + } + RatisContainerReplicaCount replicaCount = + new RatisContainerReplicaCount(containerInfo, replicas, pendingAdd, + pendingDelete, containerInfo.getReplicationFactor().getNumber(), + minHealthyForMaintenance); + + // verify that this container is actually over replicated + if (!verifyOverReplication(replicaCount)) { + return Collections.emptyMap(); + } + + // get number of excess replicas + int excess = replicaCount.getExcessRedundancy(true); + LOG.info("Container {} is over replicated. Actual replica count is {}, " + + "with {} pending delete(s). Expected replica count is {}.", + containerInfo.containerID(), + replicaCount.getReplicas().size(), pendingDelete, + replicaCount.getReplicationFactor()); + + // get replicas that can be deleted, in sorted order + List<ContainerReplica> eligibleReplicas = + getEligibleReplicas(replicaCount, pendingOps); + if (eligibleReplicas.size() == 0) { + LOG.info("Did not find any replicas that are eligible to be deleted for" + + " container {}.", containerInfo); + return Collections.emptyMap(); + } + + return createCommands(containerInfo, eligibleReplicas, excess); + } + + private boolean verifyOverReplication( + RatisContainerReplicaCount replicaCount) { + if (!replicaCount.isOverReplicated()) { + LOG.info("Container {} is actually not over-replicated any more.", + replicaCount.getContainer().containerID()); + return false; + } + return true; + } + + /** + * Finds replicas that are eligible to be deleted, sorted to avoid + * potential data loss. + * @see + * <a href="https://issues.apache.org/jira/browse/HDDS-4589">HDDS-4589</a> + * @param replicaCount ContainerReplicaCount object for the container + * @param pendingOps Pending adds and deletes + * @return List of ContainerReplica sorted using + * {@link RatisOverReplicationHandler#sortReplicas(Collection)} + */ + private List<ContainerReplica> getEligibleReplicas( + ContainerReplicaCount replicaCount, List<ContainerReplicaOp> pendingOps) { + // sort replicas so that they can be selected in a deterministic way + List<ContainerReplica> eligibleReplicas = + sortReplicas(replicaCount.getReplicas()); + + // retain one replica per unique origin datanode if the container is not + // closed + final Map<UUID, ContainerReplica> uniqueReplicas = + new LinkedHashMap<>(); + if (replicaCount.getContainer().getState() != + HddsProtos.LifeCycleState.CLOSED) { + eligibleReplicas.stream() + // get replicas with state that matches container state + .filter(r -> ReplicationManager.compareState( + replicaCount.getContainer().getState(), + r.getState())) + .forEach(r -> uniqueReplicas + .putIfAbsent(r.getOriginDatanodeId(), r)); + + // note that this preserves order of the List + eligibleReplicas.removeAll(uniqueReplicas.values()); + } + + Set<DatanodeDetails> pendingDeletion = new HashSet<>(); + // collect the DNs that are going to have their container replica deleted + for (ContainerReplicaOp op : pendingOps) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + pendingDeletion.add(op.getTarget()); + } + } + + // replicas that are not on IN_SERVICE nodes or are already pending + // delete are not eligible + // TODO what about nodes that are not healthy? + eligibleReplicas.removeIf( + replica -> replica.getDatanodeDetails().getPersistedOpState() != + HddsProtos.NodeOperationalState.IN_SERVICE || + pendingDeletion.contains(replica.getDatanodeDetails())); + + return eligibleReplicas; + } + + /** + * Sorts replicas using {@link ContainerReplica#hashCode()} (ContainerID and + * DatanodeDetails). + * @param replicas replicas to sort + * @return sorted List + */ + private List<ContainerReplica> sortReplicas( + Collection<ContainerReplica> replicas) { + return replicas.stream() + .sorted(Comparator.comparingLong(ContainerReplica::hashCode)) + .collect(Collectors.toList()); + } + + private Map<DatanodeDetails, SCMCommand<?>> createCommands( + ContainerInfo containerInfo, List<ContainerReplica> replicas, + int excess) { + Map<DatanodeDetails, SCMCommand<?>> commands = new HashMap<>(); + + /* + Over replication means we have enough healthy replicas, so unhealthy + replicas can be deleted. This might make the container violate placement + policy. + */ + List<ContainerReplica> unhealthyReplicas = new ArrayList<>(); + for (ContainerReplica replica : replicas) { + if (excess == 0) { + return commands; + } + if (!ReplicationManager.compareState( + containerInfo.getState(), replica.getState())) { + commands.put(replica.getDatanodeDetails(), + createDeleteCommand(containerInfo)); + unhealthyReplicas.add(replica); + excess--; + } + } + replicas.removeAll(unhealthyReplicas); + + /* + Remove excess replicas if that does not make the container mis replicated. Review Comment: We are hopefully going to improve this area, by asking the placement policy to tell us which replicas to remove and keep the placement correct. However that is not done yet, so for now best to keep this as it is. -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
