This is an automated email from the ASF dual-hosted git repository.
ljain pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new 05c7a98 HDDS-5253. Support container move HA (#2488)
05c7a98 is described below
commit 05c7a98ede0d9f1d5995ad5827055db9a9866bc7
Author: Jackson Yao <[email protected]>
AuthorDate: Mon Aug 16 19:17:13 2021 +0800
HDDS-5253. Support container move HA (#2488)
---
.../container/common/helpers/MoveDataNodePair.java | 71 ++++
.../hadoop/hdds/scm/metadata/SCMMetadataStore.java | 6 +
.../interface-client/src/main/proto/hdds.proto | 5 +
.../src/main/proto/SCMRatisProtocol.proto | 1 +
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 8 +-
.../hdds/scm/block/SCMBlockDeletingService.java | 5 +-
.../hdds/scm/container/ContainerReplicaCount.java | 1 +
.../hdds/scm/container/ReplicationManager.java | 464 +++++++++++++++++----
.../hadoop/hdds/scm/ha/SCMHAManagerImpl.java | 2 +
.../hdds/scm/metadata/MoveDataNodePairCodec.java | 53 +++
.../hadoop/hdds/scm/metadata/SCMDBDefinition.java | 13 +-
.../hdds/scm/metadata/SCMMetadataStoreImpl.java | 13 +
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../hdds/scm/container/TestReplicationManager.java | 229 +++++++---
.../hdds/scm/node/TestDatanodeAdminMonitor.java | 1 +
...TestSCMStoreImplWithOldPipelineIDKeyFormat.java | 6 +
.../hadoop/ozone/TestStorageContainerManager.java | 7 +-
.../hadoop/ozone/scm/TestFailoverWithSCMHA.java | 91 ++++
.../scm/node/TestDecommissionAndMaintenance.java | 6 +-
19 files changed, 826 insertions(+), 160 deletions(-)
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
new file mode 100644
index 0000000..578134e
--- /dev/null
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/container/common/helpers/MoveDataNodePair.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+
+
+package org.apache.hadoop.hdds.scm.container.common.helpers;
+
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.ratis.util.Preconditions;
+
+import java.io.IOException;
+
+/**
+ * MoveDataNodePair encapsulates the source and target
+ * datanodes of a move option.
+ */
+public class MoveDataNodePair {
+ /**
+ * source datanode of current move option.
+ */
+ private final DatanodeDetails src;
+
+ /**
+ * target datanode of current move option.
+ */
+ private final DatanodeDetails tgt;
+
+ public MoveDataNodePair(DatanodeDetails src, DatanodeDetails tgt) {
+ this.src = src;
+ this.tgt = tgt;
+ }
+
+ public DatanodeDetails getTgt() {
+ return tgt;
+ }
+
+ public DatanodeDetails getSrc() {
+ return src;
+ }
+
+ public HddsProtos.MoveDataNodePairProto getProtobufMessage(int clientVersion)
+ throws IOException {
+ HddsProtos.MoveDataNodePairProto.Builder builder =
+ HddsProtos.MoveDataNodePairProto.newBuilder()
+ .setSrc(src.toProto(clientVersion))
+ .setTgt(tgt.toProto(clientVersion));
+ return builder.build();
+ }
+
+ public static MoveDataNodePair getFromProtobuf(
+ HddsProtos.MoveDataNodePairProto mdnpp) {
+ Preconditions.assertNotNull(mdnpp, "MoveDataNodePair is null");
+ DatanodeDetails src = DatanodeDetails.getFromProtoBuf(mdnpp.getSrc());
+ DatanodeDetails tgt = DatanodeDetails.getFromProtoBuf(mdnpp.getTgt());
+ return new MoveDataNodePair(src, tgt);
+ }
+}
diff --git
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
index 95dc477..a63d90d0 100644
---
a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
+++
b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStore.java
@@ -25,6 +25,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
import org.apache.hadoop.hdds.utils.DBStoreHAManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -147,4 +148,9 @@ public interface SCMMetadataStore extends DBStoreHAManager {
* Table that maintains sequence id information.
*/
Table<String, Long> getSequenceIdTable();
+
+ /**
+ * Table that maintains move information.
+ */
+ Table<ContainerID, MoveDataNodePair> getMoveTable();
}
diff --git a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
index f115d0c..ee50354 100644
--- a/hadoop-hdds/interface-client/src/main/proto/hdds.proto
+++ b/hadoop-hdds/interface-client/src/main/proto/hdds.proto
@@ -62,6 +62,11 @@ message ExtendedDatanodeDetailsProto {
optional string buildDate = 5;
}
+message MoveDataNodePairProto {
+ required DatanodeDetailsProto src = 1;
+ required DatanodeDetailsProto tgt = 2;
+}
+
/**
Proto message encapsulating information required to uniquely identify a
OzoneManager.
diff --git a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
index 8066c8b..8e4237e 100644
--- a/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
+++ b/hadoop-hdds/interface-server/src/main/proto/SCMRatisProtocol.proto
@@ -26,6 +26,7 @@ enum RequestType {
BLOCK = 3;
SEQUENCE_ID = 4;
CERT_STORE = 5;
+ MOVE = 6;
}
message Method {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index 6a009f3..1673f30 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -32,10 +32,8 @@ import org.apache.hadoop.hdds.client.ReplicationConfig;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.StorageUnit;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.PipelineChoosePolicy;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.container.common.helpers.AllocatedBlock;
import org.apache.hadoop.hdds.scm.container.common.helpers.ExcludeList;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
@@ -69,7 +67,6 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
private final StorageContainerManager scm;
private final PipelineManager pipelineManager;
- private final ContainerManagerV2 containerManager;
private final WritableContainerFactory writableContainerFactory;
private final long containerSize;
@@ -78,7 +75,6 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
private final SCMBlockDeletingService blockDeletingService;
private ObjectName mxBean;
- private final PipelineChoosePolicy pipelineChoosePolicy;
private final SequenceIdGenerator sequenceIdGen;
private ScmBlockDeletingServiceMetrics metrics;
/**
@@ -94,8 +90,6 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
Objects.requireNonNull(scm, "SCM cannot be null");
this.scm = scm;
this.pipelineManager = scm.getPipelineManager();
- this.containerManager = scm.getContainerManager();
- this.pipelineChoosePolicy = scm.getPipelineChoosePolicy();
this.sequenceIdGen = scm.getSequenceIdGen();
this.containerSize = (long)conf.getStorageSize(
ScmConfigKeys.OZONE_SCM_CONTAINER_SIZE,
@@ -123,7 +117,7 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
OZONE_BLOCK_DELETING_SERVICE_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
blockDeletingService =
- new SCMBlockDeletingService(deletedBlockLog, containerManager,
+ new SCMBlockDeletingService(deletedBlockLog,
scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf,
metrics);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index 7776c56..cd50e7d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -29,7 +29,6 @@ import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.ScmConfig;
-import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMService;
@@ -67,7 +66,6 @@ public class SCMBlockDeletingService extends BackgroundService
private static final int BLOCK_DELETING_SERVICE_CORE_POOL_SIZE = 1;
private final DeletedBlockLog deletedBlockLog;
- private final ContainerManagerV2 containerManager;
private final NodeManager nodeManager;
private final EventPublisher eventPublisher;
private final SCMContext scmContext;
@@ -83,14 +81,13 @@ public class SCMBlockDeletingService extends
BackgroundService
@SuppressWarnings("parameternumber")
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
- ContainerManagerV2 containerManager, NodeManager nodeManager,
+ NodeManager nodeManager,
EventPublisher eventPublisher, SCMContext scmContext,
SCMServiceManager serviceManager, Duration interval, long serviceTimeout,
ConfigurationSource conf, ScmBlockDeletingServiceMetrics metrics) {
super("SCMBlockDeletingService", interval.toMillis(),
TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
- this.containerManager = containerManager;
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
this.scmContext = scmContext;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
index bf8c3b9..26368b4 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ContainerReplicaCount.java
@@ -18,6 +18,7 @@
package org.apache.hadoop.hdds.scm.container;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+
import java.util.Set;
import static
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState.DECOMMISSIONED;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 97fda61..009c850 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container;
import java.io.IOException;
+import java.lang.reflect.Proxy;
import java.time.Clock;
import java.time.Duration;
import java.util.ArrayList;
@@ -27,6 +28,7 @@ import java.util.Comparator;
import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
+import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
@@ -41,8 +43,6 @@ import java.util.function.Predicate;
import java.util.stream.Collectors;
import com.google.common.annotations.VisibleForTesting;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
@@ -55,18 +55,27 @@ import
org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeState;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.NodeOperationalState;
import org.apache.hadoop.hdds.protocol.proto.
StorageContainerDatanodeProtocolProtos.ContainerReplicaProto.State;
+import static org.apache.hadoop.hdds.protocol.proto.
+ SCMRatisProtocol.RequestType.MOVE;
import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import
org.apache.hadoop.hdds.scm.container.replication.ReplicationManagerMetrics;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAInvocationHandler;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMRatisServer;
import org.apache.hadoop.hdds.scm.ha.SCMService;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.metadata.Replicate;
+import org.apache.hadoop.hdds.scm.metadata.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.TableIterator;
import
org.apache.hadoop.ozone.common.statemachine.InvalidStateTransitionException;
import org.apache.hadoop.ozone.protocol.commands.CloseContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
@@ -74,6 +83,8 @@ import
org.apache.hadoop.ozone.protocol.commands.DeleteContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.ReplicateContainerCommand;
import org.apache.hadoop.ozone.protocol.commands.SCMCommand;
import org.apache.hadoop.util.ExitUtil;
+import org.apache.hadoop.hdds.utils.db.Table;
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
import com.google.protobuf.GeneratedMessage;
import static org.apache.hadoop.hdds.conf.ConfigTag.OZONE;
@@ -134,12 +145,6 @@ public class ReplicationManager implements SCMService {
*/
private final Map<ContainerID, List<InflightAction>> inflightDeletion;
- /**
- * This is used for tracking container move commands
- * which are not yet complete.
- */
- private final Map<ContainerID,
- Pair<DatanodeDetails, DatanodeDetails>> inflightMove;
/**
* This is used for indicating the result of move option and
@@ -150,7 +155,9 @@ public class ReplicationManager implements SCMService {
// both replication and deletion are completed
COMPLETED,
// RM is not running
- RM_NOT_RUNNING,
+ FAIL_NOT_RUNNING,
+ // RM is not ratis leader
+ FAIL_NOT_LEADER,
// replication fail because the container does not exist in src
REPLICATION_FAIL_NOT_EXIST_IN_SOURCE,
// replication fail because the container exists in target
@@ -184,7 +191,9 @@ public class ReplicationManager implements SCMService {
//unexpected action, remove src at inflightReplication
UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION,
//unexpected action, remove target at inflightDeletion
- UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION
+ UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION,
+ //write DB error
+ FAIL_CAN_NOT_RECORD_TO_DB
}
/**
@@ -233,6 +242,11 @@ public class ReplicationManager implements SCMService {
private ReplicationManagerMetrics metrics;
/**
+ * scheduler move option.
+ */
+ private final MoveScheduler moveScheduler;
+
+ /**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
@@ -242,13 +256,16 @@ public class ReplicationManager implements SCMService {
*/
@SuppressWarnings("parameternumber")
public ReplicationManager(final ConfigurationSource conf,
- final ContainerManagerV2 containerManager,
- final PlacementPolicy containerPlacement,
- final EventPublisher eventPublisher,
- final SCMContext scmContext,
- final SCMServiceManager serviceManager,
- final NodeManager nodeManager,
- final java.time.Clock clock) {
+ final ContainerManagerV2 containerManager,
+ final PlacementPolicy containerPlacement,
+ final EventPublisher eventPublisher,
+ final SCMContext scmContext,
+ final SCMServiceManager serviceManager,
+ final NodeManager nodeManager,
+ final java.time.Clock clock,
+ final SCMHAManager scmhaManager,
+ final Table<ContainerID, MoveDataNodePair> moveTable)
+ throws IOException {
this.containerManager = containerManager;
this.containerPlacement = containerPlacement;
this.eventPublisher = eventPublisher;
@@ -258,7 +275,6 @@ public class ReplicationManager implements SCMService {
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
- this.inflightMove = new ConcurrentHashMap<>();
this.inflightMoveFuture = new ConcurrentHashMap<>();
this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
this.clock = clock;
@@ -269,6 +285,11 @@ public class ReplicationManager implements SCMService {
TimeUnit.MILLISECONDS);
this.metrics = null;
+ moveScheduler = new MoveSchedulerImpl.Builder()
+ .setDBTransactionBuffer(scmhaManager.getDBTransactionBuffer())
+ .setRatisServer(scmhaManager.getRatisServer())
+ .setMoveTable(moveTable).build();
+
// register ReplicationManager to SCMServiceManager.
serviceManager.register(this);
@@ -317,9 +338,6 @@ public class ReplicationManager implements SCMService {
LOG.info("Stopping Replication Monitor Thread.");
inflightReplication.clear();
inflightDeletion.clear();
- //TODO: replicate inflight move through ratis
- inflightMove.clear();
- inflightMoveFuture.clear();
running = false;
metrics.unRegister();
notifyAll();
@@ -581,15 +599,15 @@ public class ReplicationManager implements SCMService {
throws ContainerNotFoundException {
// make sure inflightMove contains the container
ContainerID id = container.containerID();
- if (!inflightMove.containsKey(id)) {
- return;
- }
// make sure the datanode , which is removed from inflightActions,
// is source or target datanode.
- Pair<DatanodeDetails, DatanodeDetails> kv = inflightMove.get(id);
- final boolean isSource = kv.getKey().equals(dn);
- final boolean isTarget = kv.getValue().equals(dn);
+ MoveDataNodePair kv = moveScheduler.getMoveDataNodePair(id);
+ if (kv == null) {
+ return;
+ }
+ final boolean isSource = kv.getSrc().equals(dn);
+ final boolean isTarget = kv.getTgt().equals(dn);
if (!isSource && !isTarget) {
return;
}
@@ -610,50 +628,50 @@ public class ReplicationManager implements SCMService {
*/
if (isSource && isInflightReplication) {
- inflightMoveFuture.get(id).complete(
+ //if RM is reinitialize, inflightMove will be restored,
+ //but inflightMoveFuture not. so there will be a case that
+ //container is in inflightMove, but not in inflightMoveFuture.
+ compleleteMoveFutureWithResult(id,
MoveResult.UNEXPECTED_REMOVE_SOURCE_AT_INFLIGHT_REPLICATION);
- inflightMove.remove(id);
- inflightMoveFuture.remove(id);
+ moveScheduler.completeMove(id.getProtobuf());
return;
}
if (isTarget && !isInflightReplication) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.UNEXPECTED_REMOVE_TARGET_AT_INFLIGHT_DELETION);
- inflightMove.remove(id);
- inflightMoveFuture.remove(id);
+ moveScheduler.completeMove(id.getProtobuf());
return;
}
if (!(isInflightReplication && isCompleted)) {
if (isInflightReplication) {
if (isUnhealthy) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
} else if (isNotInService) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
} else {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.REPLICATION_FAIL_TIME_OUT);
}
} else {
if (isUnhealthy) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.DELETION_FAIL_NODE_UNHEALTHY);
} else if (isTimeout) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.DELETION_FAIL_TIME_OUT);
} else if (isNotInService) {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.DELETION_FAIL_NODE_NOT_IN_SERVICE);
} else {
- inflightMoveFuture.get(id).complete(
+ compleleteMoveFutureWithResult(id,
MoveResult.COMPLETED);
}
}
- inflightMove.remove(id);
- inflightMoveFuture.remove(id);
+ moveScheduler.completeMove(id.getProtobuf());
} else {
deleteSrcDnForMove(container,
containerManager.getContainerReplicas(id));
@@ -664,15 +682,32 @@ public class ReplicationManager implements SCMService {
* add a move action for a given container.
*
* @param cid Container to move
- * @param srcDn datanode to move from
- * @param targetDn datanode to move to
+ * @param src source datanode
+ * @param tgt target datanode
+ */
+ public CompletableFuture<MoveResult> move(ContainerID cid,
+ DatanodeDetails src, DatanodeDetails tgt)
+ throws ContainerNotFoundException, NodeNotFoundException {
+ return move(cid, new MoveDataNodePair(src, tgt));
+ }
+
+ /**
+ * add a move action for a given container.
+ *
+ * @param cid Container to move
+ * @param mp MoveDataNodePair which contains source and target datanodes
*/
public CompletableFuture<MoveResult> move(ContainerID cid,
- DatanodeDetails srcDn, DatanodeDetails targetDn)
+ MoveDataNodePair mp)
throws ContainerNotFoundException, NodeNotFoundException {
CompletableFuture<MoveResult> ret = new CompletableFuture<>();
if (!isRunning()) {
- ret.complete(MoveResult.RM_NOT_RUNNING);
+ ret.complete(MoveResult.FAIL_NOT_RUNNING);
+ return ret;
+ }
+
+ if (!scmContext.isLeader()) {
+ ret.complete(MoveResult.FAIL_NOT_LEADER);
return ret;
}
@@ -693,6 +728,8 @@ public class ReplicationManager implements SCMService {
* of deletion always depends on placement policy
*/
+ DatanodeDetails srcDn = mp.getSrc();
+ DatanodeDetails targetDn = mp.getTgt();
NodeStatus currentNodeStat = nodeManager.getNodeStatus(srcDn);
NodeState healthStat = currentNodeStat.getHealth();
NodeOperationalState operationalState =
@@ -777,7 +814,15 @@ public class ReplicationManager implements SCMService {
return ret;
}
- inflightMove.putIfAbsent(cid, new ImmutablePair<>(srcDn, targetDn));
+ try {
+ moveScheduler.startMove(cid.getProtobuf(),
+ mp.getProtobufMessage(CURRENT_VERSION));
+ } catch (IOException e) {
+ LOG.warn("Exception while starting move {}", cid);
+ ret.complete(MoveResult.FAIL_CAN_NOT_RECORD_TO_DB);
+ return ret;
+ }
+
inflightMoveFuture.putIfAbsent(cid, ret);
sendReplicateCommand(cif, targetDn, Collections.singletonList(srcDn));
}
@@ -1213,47 +1258,45 @@ public class ReplicationManager implements SCMService {
private void deleteSrcDnForMove(final ContainerInfo cif,
final Set<ContainerReplica> replicaSet) {
final ContainerID cid = cif.containerID();
- if (inflightMove.containsKey(cid)) {
- Pair<DatanodeDetails, DatanodeDetails> movePair =
- inflightMove.get(cid);
- final DatanodeDetails srcDn = movePair.getKey();
- ContainerReplicaCount replicaCount =
- getContainerReplicaCount(cif, replicaSet);
-
- if(!replicaSet.stream()
- .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
- // if the target is present but source disappears somehow,
- // we can consider move is successful.
- inflightMoveFuture.get(cid).complete(MoveResult.COMPLETED);
- inflightMove.remove(cid);
- inflightMoveFuture.remove(cid);
- return;
- }
+ MoveDataNodePair movePair = moveScheduler.getMoveDataNodePair(cid);
+ if (movePair == null) {
+ return;
+ }
+ final DatanodeDetails srcDn = movePair.getSrc();
+ ContainerReplicaCount replicaCount =
+ getContainerReplicaCount(cif, replicaSet);
+
+ if(!replicaSet.stream()
+ .anyMatch(r -> r.getDatanodeDetails().equals(srcDn))){
+ // if the target is present but source disappears somehow,
+ // we can consider move is successful.
+ compleleteMoveFutureWithResult(cid, MoveResult.COMPLETED);
+ moveScheduler.completeMove(cid.getProtobuf());
+ return;
+ }
- int replicationFactor =
- cif.getReplicationConfig().getRequiredNodes();
- ContainerPlacementStatus currentCPS =
- getPlacementStatus(replicaSet, replicationFactor);
- Set<ContainerReplica> newReplicaSet = replicaSet.
- stream().collect(Collectors.toSet());
- newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
- ContainerPlacementStatus newCPS =
- getPlacementStatus(newReplicaSet, replicationFactor);
-
- if (replicaCount.isOverReplicated() &&
- isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
- sendDeleteCommand(cif, srcDn, true);
- } else {
- // if source and target datanode are both in the replicaset,
- // but we can not delete source datanode for now (e.g.,
- // there is only 3 replicas or not policy-statisfied , etc.),
- // we just complete the future without sending a delete command.
- LOG.info("can not remove source replica after successfully " +
- "replicated to target datanode");
- inflightMoveFuture.get(cid).complete(MoveResult.DELETE_FAIL_POLICY);
- inflightMove.remove(cid);
- inflightMoveFuture.remove(cid);
- }
+ int replicationFactor =
+ cif.getReplicationConfig().getRequiredNodes();
+ ContainerPlacementStatus currentCPS =
+ getPlacementStatus(replicaSet, replicationFactor);
+ Set<ContainerReplica> newReplicaSet = replicaSet.
+ stream().collect(Collectors.toSet());
+ newReplicaSet.removeIf(r -> r.getDatanodeDetails().equals(srcDn));
+ ContainerPlacementStatus newCPS =
+ getPlacementStatus(newReplicaSet, replicationFactor);
+
+ if (replicaCount.isOverReplicated() &&
+ isPlacementStatusActuallyEqual(currentCPS, newCPS)) {
+ sendDeleteCommand(cif, srcDn, true);
+ } else {
+ // if source and target datanode are both in the replicaset,
+ // but we can not delete source datanode for now (e.g.,
+ // there is only 3 replicas or not policy-statisfied , etc.),
+ // we just complete the future without sending a delete command.
+ LOG.info("can not remove source replica after successfully " +
+ "replicated to target datanode");
+ compleleteMoveFutureWithResult(cid, MoveResult.DELETE_FAIL_POLICY);
+ moveScheduler.completeMove(cid.getProtobuf());
}
}
@@ -1680,6 +1723,9 @@ public class ReplicationManager implements SCMService {
lastTimeToBeReadyInMillis = clock.millis();
serviceStatus = ServiceStatus.RUNNING;
}
+ //now, as the current scm is leader and it`s state is up-to-date,
+ //we need to take some action about replicated inflight move options.
+ onLeaderReadyAndOutOfSafeMode();
} else {
serviceStatus = ServiceStatus.PAUSING;
}
@@ -1717,8 +1763,244 @@ public class ReplicationManager implements SCMService {
return inflightDeletion;
}
- public Map<ContainerID,
- Pair<DatanodeDetails, DatanodeDetails>> getInflightMove() {
- return inflightMove;
+ public Map<ContainerID, CompletableFuture<MoveResult>> getInflightMove() {
+ return inflightMoveFuture;
}
-}
\ No newline at end of file
+
+ /**
+ * make move option HA aware.
+ */
+ public interface MoveScheduler {
+ /**
+ * completeMove a move action for a given container.
+ *
+ * @param contianerIDProto Container to which the move option is finished
+ */
+ @Replicate
+ void completeMove(HddsProtos.ContainerID contianerIDProto);
+
+ /**
+ * start a move action for a given container.
+ *
+ * @param contianerIDProto Container to move
+ * @param mp encapsulates the source and target datanode infos
+ */
+ @Replicate
+ void startMove(HddsProtos.ContainerID contianerIDProto,
+ HddsProtos.MoveDataNodePairProto mp) throws IOException;
+
+ /**
+ * get the MoveDataNodePair of the giver container.
+ *
+ * @param cid Container to move
+ * @return null if cid is not found in MoveScheduler,
+ * or the corresponding MoveDataNodePair
+ */
+ MoveDataNodePair getMoveDataNodePair(ContainerID cid);
+
+ /**
+ * Reinitialize the MoveScheduler with DB if become leader.
+ */
+ void reinitialize(Table<ContainerID,
+ MoveDataNodePair> moveTable) throws IOException;
+
+ /**
+ * get all the inflight move info.
+ */
+ Map<ContainerID, MoveDataNodePair> getInflightMove();
+ }
+
+ /**
+ * @return the moveScheduler of RM
+ */
+ public MoveScheduler getMoveScheduler() {
+ return moveScheduler;
+ }
+
+ /**
+ * Ratis based MoveScheduler, db operations are stored in
+ * DBTransactionBuffer until a snapshot is taken.
+ */
+ public static final class MoveSchedulerImpl implements MoveScheduler {
+ private Table<ContainerID, MoveDataNodePair> moveTable;
+ private final DBTransactionBuffer transactionBuffer;
+ /**
+ * This is used for tracking container move commands
+ * which are not yet complete.
+ */
+ private final Map<ContainerID, MoveDataNodePair> inflightMove;
+
+ private MoveSchedulerImpl(Table<ContainerID, MoveDataNodePair> moveTable,
+ DBTransactionBuffer transactionBuffer) throws IOException {
+ this.moveTable = moveTable;
+ this.transactionBuffer = transactionBuffer;
+ this.inflightMove = new ConcurrentHashMap<>();
+ initialize();
+ }
+
+ @Override
+ public void completeMove(HddsProtos.ContainerID contianerIDProto) {
+ ContainerID cid = null;
+ try {
+ cid = ContainerID.getFromProtobuf(contianerIDProto);
+ transactionBuffer.removeFromBuffer(moveTable, cid);
+ } catch (IOException e) {
+ LOG.warn("Exception while completing move {}", cid);
+ }
+ inflightMove.remove(cid);
+ }
+
+ @Override
+ public void startMove(HddsProtos.ContainerID contianerIDProto,
+ HddsProtos.MoveDataNodePairProto mdnpp)
+ throws IOException {
+ ContainerID cid = null;
+ MoveDataNodePair mp = null;
+ try {
+ cid = ContainerID.getFromProtobuf(contianerIDProto);
+ mp = MoveDataNodePair.getFromProtobuf(mdnpp);
+ if(!inflightMove.containsKey(cid)) {
+ transactionBuffer.addToBuffer(moveTable, cid, mp);
+ inflightMove.putIfAbsent(cid, mp);
+ }
+ } catch (IOException e) {
+ LOG.warn("Exception while completing move {}", cid);
+ }
+ }
+
+ @Override
+ public MoveDataNodePair getMoveDataNodePair(ContainerID cid) {
+ return inflightMove.get(cid);
+ }
+
+ @Override
+ public void reinitialize(Table<ContainerID,
+ MoveDataNodePair> mt) throws IOException {
+ moveTable = mt;
+ inflightMove.clear();
+ initialize();
+ }
+
+ private void initialize() throws IOException {
+ TableIterator<ContainerID,
+ ? extends Table.KeyValue<ContainerID, MoveDataNodePair>>
+ iterator = moveTable.iterator();
+
+ while (iterator.hasNext()) {
+ Table.KeyValue<ContainerID, MoveDataNodePair> kv = iterator.next();
+ final ContainerID cid = kv.getKey();
+ final MoveDataNodePair mp = kv.getValue();
+ Preconditions.assertNotNull(cid,
+ "moved container id should not be null");
+ Preconditions.assertNotNull(mp,
+ "MoveDataNodePair container id should not be null");
+ inflightMove.put(cid, mp);
+ }
+ }
+
+ @Override
+ public Map<ContainerID, MoveDataNodePair> getInflightMove() {
+ return inflightMove;
+ }
+
+ /**
+ * Builder for Ratis based MoveSchedule.
+ */
+ public static class Builder {
+ private Table<ContainerID, MoveDataNodePair> moveTable;
+ private DBTransactionBuffer transactionBuffer;
+ private SCMRatisServer ratisServer;
+
+ public Builder setRatisServer(final SCMRatisServer scmRatisServer) {
+ ratisServer = scmRatisServer;
+ return this;
+ }
+
+ public Builder setMoveTable(
+ final Table<ContainerID, MoveDataNodePair> mt) {
+ moveTable = mt;
+ return this;
+ }
+
+ public Builder setDBTransactionBuffer(DBTransactionBuffer trxBuffer) {
+ transactionBuffer = trxBuffer;
+ return this;
+ }
+
+ public MoveScheduler build() throws IOException {
+ Preconditions.assertNotNull(moveTable, "moveTable is null");
+ Preconditions.assertNotNull(transactionBuffer,
+ "transactionBuffer is null");
+
+ final MoveScheduler impl =
+ new MoveSchedulerImpl(moveTable, transactionBuffer);
+
+ final SCMHAInvocationHandler invocationHandler
+ = new SCMHAInvocationHandler(MOVE, impl, ratisServer);
+
+ return (MoveScheduler) Proxy.newProxyInstance(
+ SCMHAInvocationHandler.class.getClassLoader(),
+ new Class<?>[]{MoveScheduler.class},
+ invocationHandler);
+ }
+ }
+ }
+
+ /**
+ * when scm become LeaderReady and out of safe mode, some actions
+ * should be taken. for now , it is only used for handle replicated
+ * infligtht move.
+ */
+ private void onLeaderReadyAndOutOfSafeMode() {
+ List<HddsProtos.ContainerID> needToRemove = new LinkedList<>();
+ moveScheduler.getInflightMove().forEach((k, v) -> {
+ Set<ContainerReplica> replicas;
+ ContainerInfo cif;
+ try {
+ replicas = containerManager.getContainerReplicas(k);
+ cif = containerManager.getContainer(k);
+ } catch (ContainerNotFoundException e) {
+ needToRemove.add(k.getProtobuf());
+ LOG.error("can not find container {} " +
+ "while processing replicated move", k);
+ return;
+ }
+ boolean isSrcExist = replicas.stream()
+ .anyMatch(r -> r.getDatanodeDetails().equals(v.getSrc()));
+ boolean isTgtExist = replicas.stream()
+ .anyMatch(r -> r.getDatanodeDetails().equals(v.getTgt()));
+
+ if(isSrcExist) {
+ if(isTgtExist) {
+ //the former scm leader may or may not send the deletion command
+ //before reelection.here, we just try to send the command again.
+ deleteSrcDnForMove(cif, replicas);
+ } else {
+ // resenting replication command is ok , no matter whether there is
an
+ // on-going replication
+ sendReplicateCommand(cif, v.getTgt(),
+ Collections.singletonList(v.getSrc()));
+ }
+ } else {
+ // if container does not exist in src datanode, no matter it exists
+ // in target datanode, we can not take more actions to this option,
+ // so just remove it through ratis
+ needToRemove.add(k.getProtobuf());
+ }
+ });
+
+ needToRemove.forEach(moveScheduler::completeMove);
+ }
+
+ /**
+ * complete the CompletableFuture of the container in the given Map with
+ * a given MoveResult.
+ */
+ private void compleleteMoveFutureWithResult(ContainerID cid, MoveResult mr){
+ if(inflightMoveFuture.containsKey(cid)) {
+ inflightMoveFuture.get(cid).complete(mr);
+ inflightMoveFuture.remove(cid);
+ }
+ }
+}
+
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
index de33120..3821575 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMHAManagerImpl.java
@@ -361,6 +361,8 @@ public class SCMHAManagerImpl implements SCMHAManager {
scm.getContainerManager().reinitialize(metadataStore.getContainerTable());
scm.getScmBlockManager().getDeletedBlockLog().reinitialize(
metadataStore.getDeletedBlocksTXTable());
+ scm.getReplicationManager().getMoveScheduler()
+ .reinitialize(metadataStore.getMoveTable());
if (OzoneSecurityUtil.isSecurityEnabled(conf)) {
if (scm.getRootCertificateServer() != null) {
scm.getRootCertificateServer().reinitialize(metadataStore);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
new file mode 100644
index 0000000..24601a5
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/MoveDataNodePairCodec.java
@@ -0,0 +1,53 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.hadoop.hdds.scm.metadata;
+
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos.MoveDataNodePairProto;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
+import org.apache.hadoop.hdds.utils.db.Codec;
+
+import java.io.IOException;
+
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
+
+/**
+ * Codec to serialize / deserialize MoveDataNodePair.
+ */
+
+public class MoveDataNodePairCodec implements Codec<MoveDataNodePair> {
+ @Override
+ public byte[] toPersistedFormat(MoveDataNodePair mdnp)
+ throws IOException {
+ return mdnp.getProtobufMessage(CURRENT_VERSION).toByteArray();
+ }
+
+ @Override
+ public MoveDataNodePair fromPersistedFormat(byte[] rawData)
+ throws IOException {
+ MoveDataNodePairProto.Builder builder =
+ MoveDataNodePairProto.newBuilder(
+ MoveDataNodePairProto.PARSER.parseFrom(rawData));
+ return MoveDataNodePair.getFromProtobuf(builder.build());
+ }
+
+ @Override
+ public MoveDataNodePair copyObject(MoveDataNodePair object) {
+ throw new UnsupportedOperationException();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
index 7da36d1..b3e861b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMDBDefinition.java
@@ -24,6 +24,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
import org.apache.hadoop.hdds.security.x509.crl.CRLInfoCodec;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -143,6 +144,16 @@ public class SCMDBDefinition implements DBDefinition {
Long.class,
new LongCodec());
+ public static final DBColumnFamilyDefinition<ContainerID,
+ MoveDataNodePair>
+ MOVE =
+ new DBColumnFamilyDefinition<>(
+ "move",
+ ContainerID.class,
+ new ContainerIDCodec(),
+ MoveDataNodePair.class,
+ new MoveDataNodePairCodec());
+
@Override
public String getName() {
return "scm.db";
@@ -157,6 +168,6 @@ public class SCMDBDefinition implements DBDefinition {
public DBColumnFamilyDefinition[] getColumnFamilies() {
return new DBColumnFamilyDefinition[] {DELETED_BLOCKS, VALID_CERTS,
VALID_SCM_CERTS, REVOKED_CERTS, REVOKED_CERTS_V2, PIPELINES,
CONTAINERS,
- TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID};
+ TRANSACTIONINFO, CRLS, CRL_SEQUENCE_ID, SEQUENCE_ID, MOVE};
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
index 170fdde..799e128 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/metadata/SCMMetadataStoreImpl.java
@@ -26,6 +26,7 @@ import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.DeletedBlocksTransaction;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
import org.apache.hadoop.hdds.utils.HAUtils;
import org.apache.hadoop.hdds.utils.TransactionInfo;
@@ -43,6 +44,7 @@ import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CONTAINERS;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRLS;
import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.CRL_SEQUENCE_ID;
import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.DELETED_BLOCKS;
+import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.MOVE;
import static org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.PIPELINES;
import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS;
import static
org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition.REVOKED_CERTS_V2;
@@ -84,6 +86,8 @@ public class SCMMetadataStoreImpl implements SCMMetadataStore
{
private Table<String, Long> sequenceIdTable;
+ private Table<ContainerID, MoveDataNodePair> moveTable;
+
private static final Logger LOG =
LoggerFactory.getLogger(SCMMetadataStoreImpl.class);
private DBStore store;
@@ -166,6 +170,10 @@ public class SCMMetadataStoreImpl implements
SCMMetadataStore {
sequenceIdTable = SEQUENCE_ID.getTable(store);
checkTableStatus(sequenceIdTable, SEQUENCE_ID.getName());
+
+ moveTable = MOVE.getTable(store);
+
+ checkTableStatus(moveTable, MOVE.getName());
}
}
@@ -266,6 +274,11 @@ public class SCMMetadataStoreImpl implements
SCMMetadataStore {
return sequenceIdTable;
}
+ @Override
+ public Table<ContainerID, MoveDataNodePair> getMoveTable() {
+ return moveTable;
+ }
+
private void checkTableStatus(Table table, String name) throws IOException {
String logMessage = "Unable to get a reference to %s table. Cannot " +
"continue.";
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index d268f2c..e9d439e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -606,7 +606,9 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
scmContext,
serviceManager,
scmNodeManager,
- new MonotonicClock(ZoneOffset.UTC));
+ new MonotonicClock(ZoneOffset.UTC),
+ scmHAManager,
+ getScmMetadataStore().getMoveTable());
}
if(configurator.getScmSafeModeManager() != null) {
scmSafeModeManager = configurator.getScmSafeModeManager();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d7fcde7..631ab2b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -19,6 +19,7 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.primitives.Longs;
+import org.apache.commons.io.FileUtils;
import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -31,18 +32,24 @@ import org.apache.hadoop.hdds.protocol.proto
import org.apache.hadoop.hdds.scm.container.ReplicationManager
.ReplicationManagerConfiguration;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementStatusDefault;
import org.apache.hadoop.hdds.scm.container.ReplicationManager.MoveResult;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
+import org.apache.hadoop.hdds.scm.metadata.SCMDBTransactionBufferImpl;
+import org.apache.hadoop.hdds.utils.db.DBStore;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
+import org.apache.hadoop.hdds.utils.db.DBStoreBuilder;
import org.apache.hadoop.ozone.protocol.commands.CommandForDatanode;
import org.apache.ozone.test.GenericTestUtils;
import org.apache.ozone.test.TestClock;
@@ -52,6 +59,7 @@ import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
+import java.io.File;
import java.io.IOException;
import java.time.Instant;
import java.time.ZoneId;
@@ -95,24 +103,27 @@ public class TestReplicationManager {
private DatanodeCommandHandler datanodeCommandHandler;
private SimpleMockNodeManager nodeManager;
private ContainerManagerV2 containerManager;
- private OzoneConfiguration conf;
- private SCMNodeManager scmNodeManager;
private GenericTestUtils.LogCapturer scmLogs;
+ private SCMServiceManager serviceManager;
private TestClock clock;
+ private File testDir;
+ private DBStore dbStore;
@Before
public void setup()
throws IOException, InterruptedException, NodeNotFoundException {
- conf = new OzoneConfiguration();
+ OzoneConfiguration conf = new OzoneConfiguration();
conf.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
- scmLogs = GenericTestUtils.LogCapturer.captureLogs(ReplicationManager.LOG);
+ scmLogs = GenericTestUtils.LogCapturer
+ .captureLogs(ReplicationManager.LOG);
containerManager = Mockito.mock(ContainerManagerV2.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
containerStateManager = new ContainerStateManager(conf);
+ serviceManager = new SCMServiceManager();
datanodeCommandHandler = new DatanodeCommandHandler();
eventQueue.addHandler(SCMEvents.DATANODE_COMMAND, datanodeCommandHandler);
@@ -152,28 +163,29 @@ public class TestReplicationManager {
Mockito.when(containerPlacementPolicy.validateContainerPlacement(
Mockito.any(),
Mockito.anyInt()
- )).thenAnswer(invocation -> {
- return new ContainerPlacementStatusDefault(2, 2, 3);
- });
-
- scmNodeManager = Mockito.mock(SCMNodeManager.class);
- Mockito.when(scmNodeManager.getNodeStatus(
- Mockito.any(DatanodeDetails.class)))
- .thenReturn(NodeStatus.inServiceHealthy());
-
+ )).thenAnswer(invocation ->
+ new ContainerPlacementStatusDefault(2, 2, 3));
clock = new TestClock(Instant.now(), ZoneId.of("UTC"));
createReplicationManager(new ReplicationManagerConfiguration());
}
private void createReplicationManager(ReplicationManagerConfiguration rmConf)
- throws InterruptedException {
+ throws InterruptedException, IOException {
OzoneConfiguration config = new OzoneConfiguration();
+ testDir = GenericTestUtils
+ .getTestDir(TestSCMContainerManager.class.getSimpleName());
+ config.set(HddsConfigKeys.OZONE_METADATA_DIRS,
+ testDir.getAbsolutePath());
config.setTimeDuration(
HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
0, TimeUnit.SECONDS);
config.setFromObject(rmConf);
- SCMServiceManager serviceManager = new SCMServiceManager();
+ SCMHAManager scmHAManager = MockSCMHAManager
+ .getInstance(true, new SCMDBTransactionBufferImpl());
+ dbStore = DBStoreBuilder.createDBStore(
+ config, new SCMDBDefinition());
+
replicationManager = new ReplicationManager(
config,
containerManager,
@@ -182,7 +194,9 @@ public class TestReplicationManager {
SCMContext.emptyContext(),
serviceManager,
nodeManager,
- clock);
+ clock,
+ scmHAManager,
+ SCMDBDefinition.MOVE.getTable(dbStore));
serviceManager.notifyStatusChanged();
scmLogs.clearOutput();
@@ -998,7 +1012,7 @@ public class TestReplicationManager {
@Test
public void testOverReplicatedAndPolicyUnSatisfiedAndDeleted() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = getContainer(LifeCycleState.CLOSED);
final ContainerID id = container.containerID();
final UUID originNodeId = UUID.randomUUID();
@@ -1046,7 +1060,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedDueToDecommission() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1060,7 +1074,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedDueToAllDecommission() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONING, HEALTHY), CLOSED);
@@ -1074,7 +1088,7 @@ public class TestReplicationManager {
*/
@Test
public void testCorrectlyReplicatedWithDecommission() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1089,7 +1103,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedDueToMaintenance() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1102,12 +1116,13 @@ public class TestReplicationManager {
* min replica for maintenance is 1 and another replica is available.
*/
@Test
- public void testNotUnderReplicatedDueToMaintenanceMinRepOne() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testNotUnderReplicatedDueToMaintenanceMinRepOne()
+ throws Exception {
replicationManager.stop();
ReplicationManagerConfiguration newConf =
new ReplicationManagerConfiguration();
newConf.setMaintenanceReplicaMinimum(1);
+ dbStore.close();
createReplicationManager(newConf);
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1121,12 +1136,13 @@ public class TestReplicationManager {
* are going off line and min rep is 1.
*/
@Test
- public void testUnderReplicatedDueToMaintenanceMinRepOne() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ public void testUnderReplicatedDueToMaintenanceMinRepOne()
+ throws Exception {
replicationManager.stop();
ReplicationManagerConfiguration newConf =
new ReplicationManagerConfiguration();
newConf.setMaintenanceReplicaMinimum(1);
+ dbStore.close();
createReplicationManager(newConf);
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1141,7 +1157,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedDueToAllMaintenance() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_MAINTENANCE, HEALTHY), CLOSED);
@@ -1155,7 +1171,7 @@ public class TestReplicationManager {
*/
@Test
public void testCorrectlyReplicatedWithMaintenance() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
@@ -1170,7 +1186,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedWithDecommissionAndMaintenance() throws
- SCMException, ContainerNotFoundException, InterruptedException {
+ SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1187,7 +1203,7 @@ public class TestReplicationManager {
*/
@Test
public void testOverReplicatedClosedContainerWithDecomAndMaint()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ throws SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceHealthy(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, HEALTHY), CLOSED);
@@ -1231,7 +1247,7 @@ public class TestReplicationManager {
*/
@Test
public void testUnderReplicatedNotHealthySource()
- throws SCMException, ContainerNotFoundException, InterruptedException {
+ throws SCMException {
final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
addReplica(container, NodeStatus.inServiceStale(), CLOSED);
addReplica(container, new NodeStatus(DECOMMISSIONED, STALE), CLOSED);
@@ -1257,7 +1273,8 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
@@ -1284,6 +1301,100 @@ public class TestReplicationManager {
}
/**
+ * if crash happened and restarted, move option should work as expected.
+ */
+ @Test
+ public void testMoveCrashAndRestart() throws IOException,
+ NodeNotFoundException, InterruptedException {
+ final ContainerInfo container = createContainer(LifeCycleState.CLOSED);
+ ContainerID id = container.containerID();
+ ContainerReplica dn1 = addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ addReplica(container,
+ new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
+ DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
+ replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
+ Assert.assertTrue(scmLogs.getOutput().contains(
+ "receive a move request about container"));
+ Thread.sleep(100L);
+ Assert.assertTrue(datanodeCommandHandler.received(
+ SCMCommandProto.Type.replicateContainerCommand, dn3));
+ Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+ //crash happens, restart scm.
+ //clear current inflight actions and reload inflightMove from DBStore.
+ resetReplicationManager();
+ replicationManager.getMoveScheduler()
+ .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+ Assert.assertTrue(replicationManager.getMoveScheduler()
+ .getInflightMove().containsKey(id));
+ MoveDataNodePair kv = replicationManager.getMoveScheduler()
+ .getInflightMove().get(id);
+ Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
+ Assert.assertEquals(kv.getTgt(), dn3);
+ serviceManager.notifyStatusChanged();
+
+ Thread.sleep(100L);
+ // now, the container is not over-replicated,
+ // so no deleteContainerCommand will be sent
+ Assert.assertFalse(datanodeCommandHandler.received(
+ SCMCommandProto.Type.deleteContainerCommand,
dn1.getDatanodeDetails()));
+ //replica does not exist in target datanode, so a replicateContainerCommand
+ //will be sent again at notifyStatusChanged#onLeaderReadyAndOutOfSafeMode
+ Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.replicateContainerCommand));
+
+
+ //replicate container to dn3, now, over-replicated
+ addReplicaToDn(container, dn3, CLOSED);
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ //deleteContainerCommand is sent, but the src replica is not deleted now
+ Assert.assertEquals(1, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+
+ //crash happens, restart scm.
+ //clear current inflight actions and reload inflightMove from DBStore.
+ resetReplicationManager();
+ replicationManager.getMoveScheduler()
+ .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+ Assert.assertTrue(replicationManager.getMoveScheduler()
+ .getInflightMove().containsKey(id));
+ kv = replicationManager.getMoveScheduler()
+ .getInflightMove().get(id);
+ Assert.assertEquals(kv.getSrc(), dn1.getDatanodeDetails());
+ Assert.assertEquals(kv.getTgt(), dn3);
+ serviceManager.notifyStatusChanged();
+
+ //after restart and the container is over-replicated now,
+ //deleteContainerCommand will be sent again
+ Assert.assertEquals(2, datanodeCommandHandler.getInvocationCount(
+ SCMCommandProto.Type.deleteContainerCommand));
+ containerStateManager.removeContainerReplica(id, dn1);
+
+ //replica in src datanode is deleted now
+ containerStateManager.removeContainerReplica(id, dn1);
+ replicationManager.processAll();
+ eventQueue.processAll(1000);
+
+ //since the move is complete,so after scm crash and restart
+ //inflightMove should not contain the container again
+ resetReplicationManager();
+ replicationManager.getMoveScheduler()
+ .reinitialize(SCMDBDefinition.MOVE.getTable(dbStore));
+ Assert.assertFalse(replicationManager.getMoveScheduler()
+ .getInflightMove().containsKey(id));
+
+ //completeableFuture is not stored in DB, so after scm crash and
+ //restart ,completeableFuture is missing
+ }
+
+ /**
* make sure RM does not delete replica if placement policy is not satisfied.
*/
@Test
@@ -1300,7 +1411,8 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn4 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn4);
+ replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn4));
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
Thread.sleep(100L);
@@ -1341,7 +1453,8 @@ public class TestReplicationManager {
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
CompletableFuture<MoveResult> cf =
- replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(scmLogs.getOutput().contains(
"receive a move request about container"));
@@ -1353,7 +1466,8 @@ public class TestReplicationManager {
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
addReplicaToDn(container, dn3, CLOSED);
replicationManager.processAll();
eventQueue.processAll(1000);
@@ -1384,13 +1498,18 @@ public class TestReplicationManager {
DatanodeDetails dn3 = addNode(new NodeStatus(IN_SERVICE, HEALTHY));
ContainerReplica dn4 = addReplica(container,
new NodeStatus(IN_SERVICE, HEALTHY), CLOSED);
-
+
CompletableFuture<MoveResult> cf;
+ //the above move is executed successfully, so there may be some item in
+ //inflightReplication or inflightDeletion. here we stop replication manager
+ //to clear these states, which may impact the tests below.
+ //we don't need a running replicationManamger now
replicationManager.stop();
Thread.sleep(100L);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
- MoveResult.RM_NOT_RUNNING);
+ MoveResult.FAIL_NOT_RUNNING);
replicationManager.start();
Thread.sleep(100L);
@@ -1398,7 +1517,8 @@ public class TestReplicationManager {
for (LifeCycleState state : LifeCycleState.values()) {
if (state != LifeCycleState.CLOSED) {
container.setState(state);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_CONTAINER_NOT_CLOSED);
}
@@ -1410,10 +1530,12 @@ public class TestReplicationManager {
if (state != HEALTHY) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(IN_SERVICE, state));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
- cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_UNHEALTHY);
}
@@ -1426,10 +1548,12 @@ public class TestReplicationManager {
if (state != IN_SERVICE) {
nodeManager.setNodeStatus(dn3,
new NodeStatus(state, HEALTHY));
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
- cf = replicationManager.move(id, dn3, dn1.getDatanodeDetails());
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn3, dn1.getDatanodeDetails()));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NODE_NOT_IN_SERVICE);
}
@@ -1437,13 +1561,14 @@ public class TestReplicationManager {
nodeManager.setNodeStatus(dn3, new NodeStatus(IN_SERVICE, HEALTHY));
//container exists in target datanode
- cf = replicationManager.move(id, dn1.getDatanodeDetails(),
- dn2.getDatanodeDetails());
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(),
+ dn2.getDatanodeDetails()));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_EXIST_IN_TARGET);
//container does not exist in source datanode
- cf = replicationManager.move(id, dn3, dn3);
+ cf = replicationManager.move(id, new MoveDataNodePair(dn3, dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_NOT_EXIST_IN_SOURCE);
@@ -1454,7 +1579,8 @@ public class TestReplicationManager {
replicationManager.processAll();
//waiting for inflightDeletion generation
eventQueue.processAll(1000);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_DELETION);
resetReplicationManager();
@@ -1467,7 +1593,8 @@ public class TestReplicationManager {
replicationManager.processAll();
//waiting for inflightReplication generation
eventQueue.processAll(1000);
- cf = replicationManager.move(id, dn1.getDatanodeDetails(), dn3);
+ cf = replicationManager.move(id,
+ new MoveDataNodePair(dn1.getDatanodeDetails(), dn3));
Assert.assertTrue(cf.isDone() && cf.get() ==
MoveResult.REPLICATION_FAIL_INFLIGHT_REPLICATION);
}
@@ -1561,7 +1688,7 @@ public class TestReplicationManager {
return replica;
}
- private void assertReplicaScheduled(int delta) throws InterruptedException {
+ private void assertReplicaScheduled(int delta) {
final int currentReplicateCommandCount = datanodeCommandHandler
.getInvocationCount(SCMCommandProto.Type.replicateContainerCommand);
@@ -1588,9 +1715,11 @@ public class TestReplicationManager {
}
@After
- public void teardown() throws IOException {
+ public void teardown() throws Exception {
containerStateManager.close();
replicationManager.stop();
+ dbStore.close();
+ FileUtils.deleteDirectory(testDir);
}
private static class DatanodeCommandHandler implements
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
index af762d6..80b7240 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDatanodeAdminMonitor.java
@@ -24,6 +24,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto
.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto;
import org.apache.hadoop.hdds.scm.container.*;
+import org.apache.hadoop.hdds.scm.container.ReplicationManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.server.events.EventHandler;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
index c666ef8..fb13d05 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMStoreImplWithOldPipelineIDKeyFormat.java
@@ -29,6 +29,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.security.x509.certificate.CertInfo;
import org.apache.hadoop.hdds.utils.TransactionInfo;
import org.apache.hadoop.hdds.scm.metadata.PipelineCodec;
@@ -149,6 +150,11 @@ public class TestSCMStoreImplWithOldPipelineIDKeyFormat
return null;
}
+ @Override
+ public Table<ContainerID, MoveDataNodePair> getMoveTable() {
+ return null;
+ }
+
/**
* Test SCM DB Definition for the above class.
*/
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 26afe7e..03bc8ba 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -750,10 +750,9 @@ public class TestStorageContainerManager {
new TestStorageContainerManagerHelper(cluster, conf);
helper.createKeys(10, 4096);
- GenericTestUtils.waitFor(() -> {
- return cluster.getStorageContainerManager().getContainerManager().
- getContainers() != null;
- }, 1000, 10000);
+ GenericTestUtils.waitFor(() ->
+ cluster.getStorageContainerManager().getContainerManager()
+ .getContainers() != null, 1000, 10000);
StorageContainerManager scm = cluster.getStorageContainerManager();
List<ContainerInfo> containers = cluster.getStorageContainerManager()
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
index b8ad7d5..1607d92 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/TestFailoverWithSCMHA.java
@@ -17,7 +17,10 @@
package org.apache.hadoop.ozone.scm;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
+import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.container.ContainerID;
+import org.apache.hadoop.hdds.scm.container.common.helpers.MoveDataNodePair;
import org.apache.hadoop.hdds.scm.ha.SCMHAConfiguration;
import org.apache.hadoop.hdds.scm.protocol.ScmBlockLocationProtocol;
import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol;
@@ -36,10 +39,15 @@ import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.slf4j.event.Level;
+import static org.apache.hadoop.ozone.ClientVersions.CURRENT_VERSION;
import java.io.IOException;
+import java.util.Map;
import java.util.UUID;
+import static org.apache.hadoop.hdds.scm.TestUtils.getContainer;
+import static
org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails;
+
/**
* Tests failover with SCM HA setup.
*/
@@ -136,6 +144,89 @@ public class TestFailoverWithSCMHA {
.contains("Performing failover to suggested leader"));
}
+ @Test
+ public void testMoveFailover() throws Exception {
+ SCMClientConfig scmClientConfig =
+ conf.getObject(SCMClientConfig.class);
+ scmClientConfig.setRetryCount(1);
+ scmClientConfig.setRetryInterval(100);
+ scmClientConfig.setMaxRetryTimeout(1500);
+ Assert.assertEquals(scmClientConfig.getRetryCount(), 15);
+ conf.setFromObject(scmClientConfig);
+ StorageContainerManager scm = getLeader(cluster);
+ Assert.assertNotNull(scm);
+
+ final ContainerID id =
+ getContainer(HddsProtos.LifeCycleState.CLOSED).containerID();
+ DatanodeDetails dn1 = randomDatanodeDetails();
+ DatanodeDetails dn2 = randomDatanodeDetails();
+
+ //here we just want to test whether the new leader will get the same
+ //inflight move after failover, so no need to create container and
datanode,
+ //just mock them bypassing all the pre checks.
+ scm.getReplicationManager().getMoveScheduler().startMove(id.getProtobuf(),
+ (new MoveDataNodePair(dn1, dn2)).getProtobufMessage(CURRENT_VERSION));
+
+ SCMBlockLocationFailoverProxyProvider failoverProxyProvider =
+ new SCMBlockLocationFailoverProxyProvider(conf);
+ failoverProxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+ ScmBlockLocationProtocolClientSideTranslatorPB scmBlockLocationClient =
+ new ScmBlockLocationProtocolClientSideTranslatorPB(
+ failoverProxyProvider);
+ GenericTestUtils
+ .setLogLevel(SCMBlockLocationFailoverProxyProvider.LOG, Level.DEBUG);
+ GenericTestUtils.LogCapturer logCapture = GenericTestUtils.LogCapturer
+ .captureLogs(SCMBlockLocationFailoverProxyProvider.LOG);
+ ScmBlockLocationProtocol scmBlockLocationProtocol = TracingUtil
+ .createProxy(scmBlockLocationClient, ScmBlockLocationProtocol.class,
+ conf);
+ scmBlockLocationProtocol.getScmInfo();
+ Assert.assertTrue(logCapture.getOutput()
+ .contains("Performing failover to suggested leader"));
+ scm = getLeader(cluster);
+ Assert.assertNotNull(scm);
+
+ //switch to the new leader successfully, new leader should
+ //get the same inflightMove
+ Map<ContainerID, MoveDataNodePair> inflightMove =
+ scm.getReplicationManager().getMoveScheduler().getInflightMove();
+ Assert.assertTrue(inflightMove.containsKey(id));
+ MoveDataNodePair mp = inflightMove.get(id);
+ Assert.assertTrue(dn2.equals(mp.getTgt()));
+ Assert.assertTrue(dn1.equals(mp.getSrc()));
+
+ //complete move in the new leader
+ scm.getReplicationManager().getMoveScheduler()
+ .completeMove(id.getProtobuf());
+
+
+ SCMContainerLocationFailoverProxyProvider proxyProvider =
+ new SCMContainerLocationFailoverProxyProvider(conf, null);
+ GenericTestUtils.setLogLevel(SCMContainerLocationFailoverProxyProvider.LOG,
+ Level.DEBUG);
+ logCapture = GenericTestUtils.LogCapturer
+ .captureLogs(SCMContainerLocationFailoverProxyProvider.LOG);
+ proxyProvider.changeCurrentProxy(scm.getSCMNodeId());
+ StorageContainerLocationProtocol scmContainerClient =
+ TracingUtil.createProxy(
+ new StorageContainerLocationProtocolClientSideTranslatorPB(
+ proxyProvider), StorageContainerLocationProtocol.class, conf);
+
+ scmContainerClient.allocateContainer(HddsProtos.ReplicationType.RATIS,
+ HddsProtos.ReplicationFactor.ONE, "ozone");
+ Assert.assertTrue(logCapture.getOutput()
+ .contains("Performing failover to suggested leader"));
+
+ //switch to the new leader successfully, new leader should
+ //get the same inflightMove , which should not contains
+ //that container.
+ scm = getLeader(cluster);
+ Assert.assertNotNull(scm);
+ inflightMove = scm.getReplicationManager()
+ .getMoveScheduler().getInflightMove();
+ Assert.assertFalse(inflightMove.containsKey(id));
+ }
+
static StorageContainerManager getLeader(MiniOzoneHAClusterImpl impl) {
for (StorageContainerManager scm : impl.getStorageContainerManagers()) {
if (scm.checkLeader()) {
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
index 8dd9b66..7b46941 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/scm/node/TestDecommissionAndMaintenance.java
@@ -514,7 +514,8 @@ public class TestDecommissionAndMaintenance {
waitForContainerReplicas(newContainer, 3);
ContainerReplicaCount counts =
- scm.getReplicationManager().getContainerReplicaCount(newContainer);
+ scm.getReplicationManager()
+ .getContainerReplicaCount(newContainer.containerID());
assertEquals(1, counts.getMaintenanceCount());
assertTrue(counts.isSufficientlyReplicated());
@@ -541,7 +542,8 @@ public class TestDecommissionAndMaintenance {
waitForContainerReplicas(nextContainer, 3);
// There should be no IN_MAINTENANCE node:
assertEquals(0, nm.getNodeCount(IN_MAINTENANCE, null));
- counts =
scm.getReplicationManager().getContainerReplicaCount(newContainer);
+ counts = scm.getReplicationManager()
+ .getContainerReplicaCount(newContainer.containerID());
assertEquals(0, counts.getMaintenanceCount());
assertTrue(counts.isSufficientlyReplicated());
}
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]