JacksonYao287 commented on code in PR #3425:
URL: https://github.com/apache/ozone/pull/3425#discussion_r875668025
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerManagerImpl.java:
##########
@@ -331,6 +341,10 @@ public void removeContainerReplica(final ContainerID cid,
throws ContainerNotFoundException, ContainerReplicaNotFoundException {
if (containerExist(cid)) {
containerStateManager.removeContainerReplica(cid, replica);
+ // Remove any pending delete replication operations for the deleted
+ // replica.
Review Comment:
one question: what is criteria of the completion of ecreconstruction
option?
will that be all the reconstructed replicas are seen?
this could be addressed in a separate jira
##########
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/replication/ContainerReplicaPendingOps.java:
##########
@@ -0,0 +1,227 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.container.replication;
+
+import com.google.common.util.concurrent.Striped;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+
+import java.time.Clock;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReadWriteLock;
+
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.ADD;
+import static
org.apache.hadoop.hdds.scm.container.replication.ContainerReplicaOp.PendingOpType.DELETE;
+
+/**
+ * Class to track pending replication operations across the cluster. For
+ * each container with a pending replication or pending delete, there will
+ * be an entry in this class mapping ContainerID to a list of the pending
+ * operations.
+ */
+public class ContainerReplicaPendingOps {
+
+ private final ConfigurationSource config;
+ private final Clock clock;
+ private final ConcurrentHashMap<ContainerID, List<ContainerReplicaOp>>
+ pendingOps = new ConcurrentHashMap<>();
+ private final Striped<ReadWriteLock> stripedLock = Striped.readWriteLock(64);
+
+ public ContainerReplicaPendingOps(final ConfigurationSource conf,
+ Clock clock) {
+ this.config = conf;
+ this.clock = clock;
+ }
+
+ /**
+ * Get all the ContainerReplicaOp's associated with the given ContainerID.
+ * A new list is created and returned, so it can be modified by the caller,
+ * but any changes will not be reflected in the internal map.
+ * @param containerID The ContainerID for which to retrieve the pending
+ * ops.
+ * @return Standalone list of ContainerReplica or an empty list if none
exist.
+ */
+ public List<ContainerReplicaOp> getPendingOps(ContainerID containerID) {
+ Lock lock = readLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops == null) {
+ return Collections.emptyList();
+ }
+ return new ArrayList<>(ops);
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ /**
+ * Store a ContainerReplicaOp to add a replica for the given ContainerID.
+ * @param containerID ContainerID for which to add a replica
+ * @param target The target datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ */
+ public void scheduleAddReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ addReplica(ADD, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Store a ContainerReplicaOp to delete a replica for the given ContainerID.
+ * @param containerID ContainerID for which to delete a replica
+ * @param target The target datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ */
+ public void scheduleDeleteReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ addReplica(DELETE, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+ * been replicated successfully.
+ * @param containerID ContainerID for which to complete the replication
+ * @param target The target Datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @return True if a pending replica was found and removed, false otherwise.
+ */
+ public boolean completeAddReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ return completeOp(ADD, containerID, target, replicaIndex);
+ }
+
+
+ /**
+ * Remove a stored ContainerReplicaOp from the given ContainerID as it has
+ * been deleted successfully.
+ * @param containerID ContainerID for which to complete the deletion
+ * @param target The target Datanode
+ * @param replicaIndex The replica index (zero for Ratis, > 0 for EC)
+ * @return True if a pending replica was found and removed, false otherwise.
+ */
+ public boolean completeDeleteReplica(ContainerID containerID,
+ DatanodeDetails target, int replicaIndex) {
+ return completeOp(DELETE, containerID, target, replicaIndex);
+ }
+
+ /**
+ * Remove a stored pending operation from the given ContainerID.
+ * @param containerID ContainerID for which to remove the op.
+ * @param op ContainerReplicaOp to remove
+ * @return True if an element was found and deleted, false otherwise.
+ */
+ public boolean removeOp(ContainerID containerID,
+ ContainerReplicaOp op) {
+ return completeOp(op.getOpType(), containerID, op.getTarget(),
+ op.getReplicaIndex());
+ }
+
+ /**
+ * Iterate over all pending entries and remove any which have expired,
meaning
+ * they have not completed the operation inside the given time.
+ * @param expiryMilliSeconds
+ */
+ public void removeExpiredEntries(long expiryMilliSeconds) {
+ for (ContainerID containerID : pendingOps.keySet()) {
+ // Rather than use an entry set, we get the map entry again. This is
+ // to protect against another thread modifying the value after this
+ // iterator started. Once we lock on the ContainerID object, no other
+ // changes can occur to the list of ops associated with it.
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops == null) {
+ // There should not be null entries, but another thread may have
+ // removed the map entry after the iterator was started.
+ continue;
+ }
+ Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+ while (iterator.hasNext()) {
+ ContainerReplicaOp op = iterator.next();
+ if (op.getScheduledEpochMillis() + expiryMilliSeconds
+ < clock.millis()) {
+ iterator.remove();
+ }
+ }
+ if (ops.size() == 0) {
+ pendingOps.remove(containerID);
+ }
+ } finally {
+ lock.unlock();
+ }
+ }
+ }
+
+ private void addReplica(ContainerReplicaOp.PendingOpType opType,
+ ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.computeIfAbsent(
+ containerID, s -> new ArrayList<>());
+ ops.add(new ContainerReplicaOp(opType,
+ target, replicaIndex, clock.millis()));
+ } finally {
+ lock.unlock();
+ }
+ }
+
+ private boolean completeOp(ContainerReplicaOp.PendingOpType opType,
+ ContainerID containerID, DatanodeDetails target, int replicaIndex) {
+ boolean found = false;
+ Lock lock = writeLock(containerID);
+ lock.lock();
+ try {
+ List<ContainerReplicaOp> ops = pendingOps.get(containerID);
+ if (ops != null) {
+ Iterator<ContainerReplicaOp> iterator = ops.listIterator();
+ while (iterator.hasNext()) {
+ ContainerReplicaOp op = iterator.next();
+ if (op.getOpType() == opType
+ && op.getTarget().equals(target)
+ && op.getReplicaIndex() == replicaIndex) {
+ found = true;
Review Comment:
NIT: can we add rewrite `equals` function for `ContainerReplicaOp`?
--
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
To unsubscribe, e-mail: [email protected]
For queries about this service, please contact Infrastructure at:
[email protected]
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]