adoroszlai commented on code in PR #3542: URL: https://github.com/apache/ozone/pull/3542#discussion_r908488858
########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/UnderReplicationHandler.java: ########## @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; + +import java.util.List; +import java.util.Map; +import java.util.Set; + +/** + * This interface to create respective commands after processing the replicas + * with pending ops and health check results. + */ +public interface UnderReplicationHandler { + + /** + * Identify a new set of datanode(s) to replicate/reconstruct the container + * and form the SCM commands to send it to DN. + * + * @param replicas - An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes Review Comment: Nit: comment seems stale, `replicas` is a `Set`. ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java: ########## @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ECContainerReplicaCount; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles the EC Under replication processing and forming the respective SCM + * commands. + */ +public class ECUnderReplicationHandler implements UnderReplicationHandler { + + public static final Logger LOG = + LoggerFactory.getLogger(ECUnderReplicationHandler.class); + private final ECContainerHealthCheck ecContainerHealthCheck = + new ECContainerHealthCheck(); + private final PlacementPolicy containerPlacement; + private final long currentContainerSize; + private final NodeManager nodeManager; + + public ECUnderReplicationHandler( + final PlacementPolicy containerPlacement, final ConfigurationSource conf, + NodeManager nodeManager) { + this.containerPlacement = containerPlacement; + this.currentContainerSize = (long) conf + .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + this.nodeManager = nodeManager; + } + + /** + * Identify a new set of datanode(s) to reconstruct the container and form the + * SCM command to send it to DN. In the case of decommission, it will just + * generate the replicate commands instead of reconstruction commands. + * + * @param replicas - An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes + * @param pendingOps - Inflight replications and deletion ops. + * @param result - Health check result. + * @param remainingMaintenanceRedundancy - represents that how many nodes go + * into maintenance. + * @return Returns the key value pair of destination dn where the command gets + * executed and the command itself. + */ + @Override + public Map<DatanodeDetails, SCMCommand> processAndCreateCommands( + final Set<ContainerReplica> replicas, + final List<ContainerReplicaOp> pendingOps, + final ContainerHealthResult result, + final int remainingMaintenanceRedundancy) { + ContainerInfo container = result.getContainerInfo(); + ECReplicationConfig repConfig = + (ECReplicationConfig) container.getReplicationConfig(); + final ECContainerReplicaCount replicaCount = + new ECContainerReplicaCount(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck + .checkHealth(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + LOG.debug("Handling under-replicated EC container: {}", container); + if (currentUnderRepRes + .getHealthState() != ContainerHealthResult.HealthState + .UNDER_REPLICATED) { + LOG.info("The container {} state changed and it's not in under" + + " replication any more. Current state is: {}", + container.getContainerID(), currentUnderRepRes); + return null; + } + ContainerHealthResult.UnderReplicatedHealthResult containerHealthResult = + ((ContainerHealthResult.UnderReplicatedHealthResult) + currentUnderRepRes); + if (containerHealthResult.isSufficientlyReplicatedAfterPending()) { + LOG.info("The container {} with replicas {} is sufficiently replicated", + container.getContainerID(), replicaCount.getReplicas()); + return null; + } + if (replicaCount.unRecoverable()) { + LOG.warn("The container {} is unrecoverable. The available replicas" + + " are: {}.", container.containerID(), replicaCount.getReplicas()); + } Review Comment: Is it intentional that the method proceeds for unrecoverable case? ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java: ########## @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ECContainerReplicaCount; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles the EC Under replication processing and forming the respective SCM + * commands. + */ +public class ECUnderReplicationHandler implements UnderReplicationHandler { + + public static final Logger LOG = + LoggerFactory.getLogger(ECUnderReplicationHandler.class); + private final ECContainerHealthCheck ecContainerHealthCheck = + new ECContainerHealthCheck(); + private final PlacementPolicy containerPlacement; + private final long currentContainerSize; + private final NodeManager nodeManager; + + public ECUnderReplicationHandler( + final PlacementPolicy containerPlacement, final ConfigurationSource conf, + NodeManager nodeManager) { + this.containerPlacement = containerPlacement; + this.currentContainerSize = (long) conf + .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + this.nodeManager = nodeManager; + } + + /** + * Identify a new set of datanode(s) to reconstruct the container and form the + * SCM command to send it to DN. In the case of decommission, it will just + * generate the replicate commands instead of reconstruction commands. + * + * @param replicas - An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes + * @param pendingOps - Inflight replications and deletion ops. + * @param result - Health check result. + * @param remainingMaintenanceRedundancy - represents that how many nodes go + * into maintenance. + * @return Returns the key value pair of destination dn where the command gets + * executed and the command itself. + */ + @Override + public Map<DatanodeDetails, SCMCommand> processAndCreateCommands( + final Set<ContainerReplica> replicas, + final List<ContainerReplicaOp> pendingOps, + final ContainerHealthResult result, + final int remainingMaintenanceRedundancy) { + ContainerInfo container = result.getContainerInfo(); + ECReplicationConfig repConfig = + (ECReplicationConfig) container.getReplicationConfig(); + final ECContainerReplicaCount replicaCount = + new ECContainerReplicaCount(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck + .checkHealth(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + LOG.debug("Handling under-replicated EC container: {}", container); + if (currentUnderRepRes + .getHealthState() != ContainerHealthResult.HealthState + .UNDER_REPLICATED) { + LOG.info("The container {} state changed and it's not in under" + + " replication any more. Current state is: {}", + container.getContainerID(), currentUnderRepRes); + return null; + } + ContainerHealthResult.UnderReplicatedHealthResult containerHealthResult = + ((ContainerHealthResult.UnderReplicatedHealthResult) + currentUnderRepRes); + if (containerHealthResult.isSufficientlyReplicatedAfterPending()) { + LOG.info("The container {} with replicas {} is sufficiently replicated", + container.getContainerID(), replicaCount.getReplicas()); + return null; + } + if (replicaCount.unRecoverable()) { + LOG.warn("The container {} is unrecoverable. The available replicas" + + " are: {}.", container.containerID(), replicaCount.getReplicas()); + } + final ContainerID id = container.containerID(); + try { + // State is UNDER_REPLICATED + final List<DatanodeDetails> deletionInFlight = new ArrayList<>(); + for (ContainerReplicaOp op : pendingOps) { + if (op.getOpType() == ContainerReplicaOp.PendingOpType.DELETE) { + deletionInFlight.add(op.getTarget()); + } + } + List<Integer> missingIndexes = replicaCount.unavailableIndexes(true); + // We got the missing indexes, this is excluded any decommissioning + // indexes. Find the good source nodes. + + if (missingIndexes.size() > 0 && !replicaCount.unRecoverable()) { + List<ContainerReplica> sources = replicas.stream().filter(r -> r + .getState() == StorageContainerDatanodeProtocolProtos + .ContainerReplicaProto.State.CLOSED) + // Exclude stale and dead nodes. This is particularly important for + // maintenance nodes, as the replicas will remain present in the + // container manager, even when they go dead. + .filter(r -> ReplicationManager + .getNodeStatus(r.getDatanodeDetails(), nodeManager).isHealthy()) + .filter(r -> !deletionInFlight.contains(r.getDatanodeDetails())) + .collect(Collectors.toList()); + LOG.debug("Missing indexes detected for the container {}." + + " The missing indexes are {}", id, missingIndexes); + // We have source nodes. + if (sources.size() >= repConfig.getData()) { + final List<DatanodeDetails> selectedDatanodes = + getTargetDatanodes(replicas, container, missingIndexes.size()); + + List<ReconstructECContainersCommand.DatanodeDetailsAndReplicaIndex> + sourceDatanodesWithIndex = new ArrayList<>(); + for (ContainerReplica srcReplica : sources) { + sourceDatanodesWithIndex.add( + new ReconstructECContainersCommand + .DatanodeDetailsAndReplicaIndex( + srcReplica.getDatanodeDetails(), + srcReplica.getReplicaIndex())); + } + + final ReconstructECContainersCommand reconstructionCommand = + new ReconstructECContainersCommand(id.getProtobuf().getId(), + sourceDatanodesWithIndex, selectedDatanodes, + int2byte(missingIndexes), + repConfig); + // Keeping the first target node as coordinator. + return ImmutableMap.of(selectedDatanodes.get(0), + reconstructionCommand); + } else { + LOG.warn("Cannot proceed for EC container reconstruction for {}, due" + + "to insufficient source replicas found. Number of source " + + "replicas needed: {}. Number of available source replicas are:" + + " {}. Available sources are: {}", container.containerID(), + repConfig.getData(), sources.size(), sources); + } + } else if (containerHealthResult.underReplicatedDueToDecommission()) { + Map<DatanodeDetails, SCMCommand> commands = new HashMap<>(); + Set<Integer> decomIndexes = replicaCount.decommissioningIndexes(); + final List<DatanodeDetails> selectedDatanodes = + getTargetDatanodes(replicas, container, decomIndexes.size()); + Iterator<DatanodeDetails> iterator = selectedDatanodes.iterator(); + // In this case we need to do one to one copy. + for (ContainerReplica replica : replicas) { + if (decomIndexes.contains(replica.getReplicaIndex())) { + if (!iterator.hasNext()) { + LOG.warn("Couldn't find the enough targets. Available source" + + " nodes: {}, the target nodes: {} and the decommission" + + " indexes: {}", replicas, selectedDatanodes, decomIndexes); + break; + } + DatanodeDetails decommissioningSrcNode + = replica.getDatanodeDetails(); + final ReplicateContainerCommand replicateCommand = + new ReplicateContainerCommand(id.getProtobuf().getId(), + ImmutableList.of(decommissioningSrcNode)); + DatanodeDetails target = iterator.next(); + commands.put(target, replicateCommand); + } + } + return commands; + } else { + LOG.info("There are no missing indexes or decommissioning indexes" + + " found for {}", id); + } + + } catch (IOException | IllegalStateException ex) { + LOG.warn("Exception while processing for creating the EC reconstruction" + + " container commands for {}.", + id, ex); + } + return null; + } + + private List<DatanodeDetails> getTargetDatanodes( + Set<ContainerReplica> replicas, ContainerInfo container, + int requiredNodes) throws IOException { + // We should ensure that the target datanode has enough space + // for a complete container to be created, but since the container + // size may be changed smaller than origin, we should be defensive. + final long dataSizeRequired = + Math.max(container.getUsedBytes(), currentContainerSize); + final List<DatanodeDetails> excludeList = + replicas.stream().map(ContainerReplica::getDatanodeDetails) + .collect(Collectors.toList()); + + return containerPlacement + .chooseDatanodes(excludeList, null, requiredNodes, 0, dataSizeRequired); + } + + private static byte[] int2byte(List<Integer> src) { + byte[] dst = new byte[src.size()]; + + for (int i = 0; i < src.size(); i++) { + dst[i] = (byte) ((int) src.get(i)); Review Comment: Nit: could use `byteValue` directly? ```suggestion dst[i] = src.get(i).byteValue(); ``` ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestECUnderReplicationHandler.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.states; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter; +import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult; +import org.apache.hadoop.hdds.scm.container.replication.ECUnderReplicationHandler; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; + +/** + * Tests the ECUnderReplicationHandling functionality. + */ +public class TestECUnderReplicationHandler { + private ECReplicationConfig repConfig; + private ContainerInfo container; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private NetworkTopology cluster; + private PlacementPolicy policy; + private SCMContainerPlacementMetrics metrics; + + @BeforeEach + public void setup() { + nodeManager = new MockNodeManager(true, 10) { + @Override + public NodeStatus getNodeStatus(DatanodeDetails dd) + throws NodeNotFoundException { + return NodeStatus.inServiceHealthy(); + } + }; + conf = SCMTestUtils.getConf(); + repConfig = new ECReplicationConfig(3, 2); + container = createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + // create placement policy instances + cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + policy = + new SCMContainerPlacementRackScatter(nodeManager, conf, cluster, true, + metrics) { + @Override + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, + List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose, + long metadataSizeRequired, long dataSizeRequired) + throws SCMException { + List<DatanodeDetails> dns = new ArrayList<>(); + for (int i = 0; i < nodesRequiredToChoose; i++) { + dns.add(MockDatanodeDetails.randomDatanodeDetails()); + } + return dns; + } + }; Review Comment: Maybe I'm missing something, but I think we could use `new SCMCommonPlacementPolicy` here, so we can keep `SCMContainerPlacementRackScatter` as `final`. ```suggestion policy = new SCMCommonPlacementPolicy(nodeManager, conf) { @Override public DatanodeDetails chooseNode(List<DatanodeDetails> healthyNodes) { return null; } }; ``` ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestECUnderReplicationHandler.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.states; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter; +import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult; +import org.apache.hadoop.hdds.scm.container.replication.ECUnderReplicationHandler; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; + +/** + * Tests the ECUnderReplicationHandling functionality. + */ +public class TestECUnderReplicationHandler { + private ECReplicationConfig repConfig; + private ContainerInfo container; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private NetworkTopology cluster; + private PlacementPolicy policy; + private SCMContainerPlacementMetrics metrics; + + @BeforeEach + public void setup() { + nodeManager = new MockNodeManager(true, 10) { + @Override + public NodeStatus getNodeStatus(DatanodeDetails dd) + throws NodeNotFoundException { + return NodeStatus.inServiceHealthy(); + } + }; + conf = SCMTestUtils.getConf(); + repConfig = new ECReplicationConfig(3, 2); + container = createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + // create placement policy instances + cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + policy = + new SCMContainerPlacementRackScatter(nodeManager, conf, cluster, true, + metrics) { + @Override + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, + List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose, + long metadataSizeRequired, long dataSizeRequired) + throws SCMException { + List<DatanodeDetails> dns = new ArrayList<>(); + for (int i = 0; i < nodesRequiredToChoose; i++) { + dns.add(MockDatanodeDetails.randomDatanodeDetails()); + } + return dns; + } + }; + NodeSchema[] schemas = + new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; + NodeSchemaManager.getInstance().init(schemas, true); + + } + + @BeforeEach + public void init() { + metrics = SCMContainerPlacementMetrics.create(); + } + + @Test + public void testUnderReplicationWithMissingParityIndex5() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4)); + testUnderReplicationWithMissingIndexes(ImmutableList.of(5), + availableReplicas, false); + } + + @Test + public void testUnderReplicationWithMissingIndex34() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(IN_SERVICE, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 5)); + testUnderReplicationWithMissingIndexes(ImmutableList.of(3, 4), + availableReplicas, false); + } + + @Test + public void testUnderReplicationWithMissingIndex2345() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(IN_SERVICE, 1)); + testUnderReplicationWithMissingIndexes(ImmutableList.of(2, 3, 4, 5), + availableReplicas, false); + } + + @Test + public void testUnderReplicationWithMissingIndex12345() { + Set<ContainerReplica> availableReplicas = new HashSet<>(); + testUnderReplicationWithMissingIndexes(ImmutableList.of(1, 2, 3, 4, 5), + availableReplicas, false); + } + + @Test + public void testUnderReplicationWithDecomIndex1() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(DECOMMISSIONING, 1), Pair.of(IN_SERVICE, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas, + true); + } + + @Test + public void testUnderReplicationWithDecomIndex12() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(DECOMMISSIONING, 1), Pair.of(DECOMMISSIONING, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4), + Pair.of(IN_SERVICE, 5)); + testUnderReplicationWithMissingIndexes(Lists.emptyList(), availableReplicas, + true); + } + + @Test + public void testUnderReplicationWithMixedDecomAndMissingIndexes() { + Set<ContainerReplica> availableReplicas = + registerNodes(Pair.of(DECOMMISSIONING, 1), Pair.of(DECOMMISSIONING, 2), + Pair.of(IN_SERVICE, 3), Pair.of(IN_SERVICE, 4)); + testUnderReplicationWithMissingIndexes(ImmutableList.of(5), + availableReplicas, false); + } + + public void testUnderReplicationWithMissingIndexes( + List<Integer> missingIndexes, Set<ContainerReplica> availableReplicas, + boolean decom) { + ECUnderReplicationHandler ecURH = + new ECUnderReplicationHandler(policy, conf, nodeManager); + ContainerHealthResult.UnderReplicatedHealthResult result = + Mockito.mock(ContainerHealthResult.UnderReplicatedHealthResult.class); + Mockito.when(result.underReplicatedDueToDecommission()).thenReturn(decom); + Mockito.when(result.isUnrecoverable()).thenReturn(false); + Mockito.when(result.getContainerInfo()).thenReturn(container); + + Map<DatanodeDetails, SCMCommand> datanodeDetailsSCMCommandMap = ecURH + .processAndCreateCommands(availableReplicas, ImmutableList.of(), + result, 1); + if (!decom) { + if (missingIndexes.size() <= repConfig.getParity()) { + Assertions.assertEquals(1, datanodeDetailsSCMCommandMap.size()); + Map.Entry<DatanodeDetails, SCMCommand> dnVsCommand = + datanodeDetailsSCMCommandMap.entrySet().iterator().next(); + Assert.assertTrue( + dnVsCommand.getValue() instanceof ReconstructECContainersCommand); + byte[] missingIndexesByteArr = new byte[missingIndexes.size()]; + for (int i = 0; i < missingIndexes.size(); i++) { + missingIndexesByteArr[i] = (byte) (int) missingIndexes.get(i); + } + Assert.assertArrayEquals(missingIndexesByteArr, + ((ReconstructECContainersCommand) dnVsCommand.getValue()) + .getMissingContainerIndexes()); + } else { + Assertions.assertNull(datanodeDetailsSCMCommandMap); + } + } else { + int numDecomIndexes = 0; + for (ContainerReplica repl : availableReplicas) { + if (repl.getDatanodeDetails() + .getPersistedOpState() == DECOMMISSIONING) { + numDecomIndexes++; + } + } + Assertions + .assertEquals(numDecomIndexes, datanodeDetailsSCMCommandMap.size()); + Set<Map.Entry<DatanodeDetails, SCMCommand>> entries = + datanodeDetailsSCMCommandMap.entrySet(); + for (Map.Entry<DatanodeDetails, SCMCommand> dnCommand : entries) { + Assert.assertTrue( + dnCommand.getValue() instanceof ReplicateContainerCommand); + } + } + } + + private Set<ContainerReplica> registerNodes( + Pair<HddsProtos.NodeOperationalState, Integer>... states) { + Set<ContainerReplica> replica = new HashSet<>(); + for (Pair<HddsProtos.NodeOperationalState, Integer> s : states) { + DatanodeDetails dn = MockDatanodeDetails.randomDatanodeDetails(); + dn.setPersistedOpState(s.getLeft()); + replica.add(new ContainerReplica.ContainerReplicaBuilder() + .setContainerID(ContainerID.valueOf(1)).setContainerState(CLOSED) + .setDatanodeDetails(dn).setOriginNodeId(dn.getUuid()).setSequenceId(1) + .setReplicaIndex(s.getRight()).build()); + } + return replica; + } + + private ContainerInfo createContainer(HddsProtos.LifeCycleState state, + ReplicationConfig replicationConfig) { + + return new ContainerInfo.Builder() + .setContainerID(ContainerID.valueOf(1).getId()).setState(state) + .setReplicationConfig(replicationConfig).build(); + } Review Comment: Seems like these are duplicated from `TestECContainerReplicaCount`. Can we reuse instead of copy? ########## hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ECUnderReplicationHandler.java: ########## @@ -0,0 +1,243 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.replication; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.conf.StorageUnit; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.ECContainerReplicaCount; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Handles the EC Under replication processing and forming the respective SCM + * commands. + */ +public class ECUnderReplicationHandler implements UnderReplicationHandler { + + public static final Logger LOG = + LoggerFactory.getLogger(ECUnderReplicationHandler.class); + private final ECContainerHealthCheck ecContainerHealthCheck = + new ECContainerHealthCheck(); + private final PlacementPolicy containerPlacement; + private final long currentContainerSize; + private final NodeManager nodeManager; + + public ECUnderReplicationHandler( + final PlacementPolicy containerPlacement, final ConfigurationSource conf, + NodeManager nodeManager) { + this.containerPlacement = containerPlacement; + this.currentContainerSize = (long) conf + .getStorageSize(ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE, + ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE_DEFAULT, StorageUnit.BYTES); + this.nodeManager = nodeManager; + } + + /** + * Identify a new set of datanode(s) to reconstruct the container and form the + * SCM command to send it to DN. In the case of decommission, it will just + * generate the replicate commands instead of reconstruction commands. + * + * @param replicas - An instance of ContainerReplicaCount, containing the + * current replica count and inflight adds and deletes + * @param pendingOps - Inflight replications and deletion ops. + * @param result - Health check result. + * @param remainingMaintenanceRedundancy - represents that how many nodes go + * into maintenance. + * @return Returns the key value pair of destination dn where the command gets + * executed and the command itself. + */ + @Override + public Map<DatanodeDetails, SCMCommand> processAndCreateCommands( + final Set<ContainerReplica> replicas, + final List<ContainerReplicaOp> pendingOps, + final ContainerHealthResult result, + final int remainingMaintenanceRedundancy) { + ContainerInfo container = result.getContainerInfo(); + ECReplicationConfig repConfig = + (ECReplicationConfig) container.getReplicationConfig(); + final ECContainerReplicaCount replicaCount = + new ECContainerReplicaCount(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + ContainerHealthResult currentUnderRepRes = ecContainerHealthCheck + .checkHealth(container, replicas, pendingOps, + remainingMaintenanceRedundancy); + + LOG.debug("Handling under-replicated EC container: {}", container); + if (currentUnderRepRes + .getHealthState() != ContainerHealthResult.HealthState + .UNDER_REPLICATED) { + LOG.info("The container {} state changed and it's not in under" + + " replication any more. Current state is: {}", + container.getContainerID(), currentUnderRepRes); + return null; Review Comment: Nit: I think it would be more safe to return empty map instead of null. (Note: (1) needs import, (2) also apply to other return statements, (3) needs tweak in test.) ```suggestion return emptyMap(); ``` https://github.com/apache/ozone/blob/2fd9fce6582b94faa1dd5a70535af22f179ec55a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestECUnderReplicationHandler.java#L213 ########## hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/states/TestECUnderReplicationHandler.java: ########## @@ -0,0 +1,255 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdds.scm.container.states; + +import com.google.common.collect.ImmutableList; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.hdds.client.ECReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.MockDatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos; +import org.apache.hadoop.hdds.scm.PlacementPolicy; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.container.ContainerInfo; +import org.apache.hadoop.hdds.scm.container.ContainerReplica; +import org.apache.hadoop.hdds.scm.container.MockNodeManager; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics; +import org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementRackScatter; +import org.apache.hadoop.hdds.scm.container.replication.ContainerHealthResult; +import org.apache.hadoop.hdds.scm.container.replication.ECUnderReplicationHandler; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.net.NetworkTopology; +import org.apache.hadoop.hdds.scm.net.NetworkTopologyImpl; +import org.apache.hadoop.hdds.scm.net.NodeSchema; +import org.apache.hadoop.hdds.scm.net.NodeSchemaManager; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException; +import org.apache.hadoop.ozone.container.common.SCMTestUtils; +import org.apache.hadoop.ozone.protocol.commands.ReconstructECContainersCommand; +import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand; +import org.apache.hadoop.ozone.protocol.commands.SCMCommand; +import org.assertj.core.util.Lists; +import org.junit.Assert; +import org.junit.jupiter.api.Assertions; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.mockito.Mockito; + +import java.util.ArrayList; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONING; +import static org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.IN_SERVICE; +import static org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State.CLOSED; +import static org.apache.hadoop.hdds.scm.net.NetConstants.LEAF_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.RACK_SCHEMA; +import static org.apache.hadoop.hdds.scm.net.NetConstants.ROOT_SCHEMA; + +/** + * Tests the ECUnderReplicationHandling functionality. + */ +public class TestECUnderReplicationHandler { + private ECReplicationConfig repConfig; + private ContainerInfo container; + private NodeManager nodeManager; + private OzoneConfiguration conf; + private NetworkTopology cluster; + private PlacementPolicy policy; + private SCMContainerPlacementMetrics metrics; + + @BeforeEach + public void setup() { + nodeManager = new MockNodeManager(true, 10) { + @Override + public NodeStatus getNodeStatus(DatanodeDetails dd) + throws NodeNotFoundException { + return NodeStatus.inServiceHealthy(); + } + }; + conf = SCMTestUtils.getConf(); + repConfig = new ECReplicationConfig(3, 2); + container = createContainer(HddsProtos.LifeCycleState.CLOSED, repConfig); + // create placement policy instances + cluster = new NetworkTopologyImpl(NodeSchemaManager.getInstance()); + policy = + new SCMContainerPlacementRackScatter(nodeManager, conf, cluster, true, + metrics) { + @Override + public List<DatanodeDetails> chooseDatanodes( + List<DatanodeDetails> excludedNodes, + List<DatanodeDetails> favoredNodes, int nodesRequiredToChoose, + long metadataSizeRequired, long dataSizeRequired) + throws SCMException { + List<DatanodeDetails> dns = new ArrayList<>(); + for (int i = 0; i < nodesRequiredToChoose; i++) { + dns.add(MockDatanodeDetails.randomDatanodeDetails()); + } + return dns; + } + }; + NodeSchema[] schemas = + new NodeSchema[] {ROOT_SCHEMA, RACK_SCHEMA, LEAF_SCHEMA}; + NodeSchemaManager.getInstance().init(schemas, true); + + } + + @BeforeEach + public void init() { + metrics = SCMContainerPlacementMetrics.create(); + } Review Comment: Can be merged with `setup()`? -- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. To unsubscribe, e-mail: [email protected] For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
