This is an automated email from the ASF dual-hosted git repository.
runzhiwang pushed a commit to branch HDDS-2823
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/HDDS-2823 by this push:
new 3b6918e HDDS-4295. Add SCMServiceManager to SCM HA. (#1784)
3b6918e is described below
commit 3b6918e91c55dfa35dff36d155d04f7bb796b3c2
Author: GlenGeng <[email protected]>
AuthorDate: Thu Jan 28 10:14:26 2021 +0800
HDDS-4295. Add SCMServiceManager to SCM HA. (#1784)
---
.../hadoop/ozone/protocol/commands/SCMCommand.java | 8 +-
.../proto/ScmServerDatanodeHeartbeatProtocol.proto | 8 +-
.../hadoop/hdds/scm/block/BlockManagerImpl.java | 2 +-
.../hdds/scm/block/SCMBlockDeletingService.java | 54 +++-
.../container/AbstractContainerReportHandler.java | 2 +-
.../scm/container/CloseContainerEventHandler.java | 2 +-
.../hdds/scm/container/ReplicationManager.java | 101 +++++--
.../apache/hadoop/hdds/scm/events/SCMEvents.java | 3 -
.../org/apache/hadoop/hdds/scm/ha/SCMContext.java | 114 ++++++--
.../org/apache/hadoop/hdds/scm/ha/SCMService.java | 65 +++++
.../hadoop/hdds/scm/ha/SCMServiceManager.java | 67 +++++
.../apache/hadoop/hdds/scm/ha/SCMStateMachine.java | 8 +-
.../hadoop/hdds/scm/node/NewNodeHandler.java | 16 +-
.../scm/node/NonHealthyToHealthyNodeHandler.java | 18 +-
.../hadoop/hdds/scm/node/SCMNodeManager.java | 2 +-
.../scm/pipeline/BackgroundPipelineCreatorV2.java | 303 +++++++++++++++++++++
.../hdds/scm/pipeline/PipelineActionHandler.java | 2 +-
.../hadoop/hdds/scm/pipeline/PipelineManager.java | 7 +-
.../hdds/scm/pipeline/PipelineManagerV2Impl.java | 98 ++-----
.../hdds/scm/pipeline/PipelineReportHandler.java | 2 +-
.../hdds/scm/pipeline/RatisPipelineProvider.java | 4 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 4 +-
.../hdds/scm/safemode/SCMSafeModeManager.java | 55 ++--
.../hdds/scm/server/StorageContainerManager.java | 42 ++-
.../hadoop/hdds/scm/block/TestBlockManager.java | 16 +-
.../container/TestCloseContainerEventHandler.java | 13 +-
.../hdds/scm/container/TestReplicationManager.java | 33 ++-
.../scm/container/TestSCMContainerManager.java | 5 +-
.../apache/hadoop/hdds/scm/ha/TestSCMContext.java | 24 +-
.../hadoop/hdds/scm/ha/TestSCMServiceManager.java | 151 ++++++++++
.../hdds/scm/node/TestContainerPlacement.java | 4 +-
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 1 -
.../hdds/scm/pipeline/MockPipelineManager.java | 8 -
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 53 ++--
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 10 +-
.../safemode/TestHealthyPipelineSafeModeRule.java | 27 +-
.../TestOneReplicaPipelineSafeModeRule.java | 11 +-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 114 +++-----
.../TestRatisPipelineCreateAndDestroy.java | 5 +-
.../hadoop/ozone/TestStorageContainerManager.java | 9 +-
.../ozone/container/TestContainerReplication.java | 2 +-
.../TestCloseContainerByPipeline.java | 10 +-
.../commandhandler/TestCloseContainerHandler.java | 2 +-
.../commandhandler/TestDeleteContainerHandler.java | 8 +-
44 files changed, 1105 insertions(+), 388 deletions(-)
diff --git
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
index 4d87bb0..6115f16 100644
---
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
+++
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/protocol/commands/SCMCommand.java
@@ -32,11 +32,9 @@ public abstract class SCMCommand<T extends GeneratedMessage>
implements
IdentifiableEventPayload {
private final long id;
- // Under HA mode, holds term of underlying RaftServer iff current
- // SCM is a leader, otherwise, holds term 0.
- // Notes that, the first elected leader is from term 1, term 0,
- // as the initial value of currentTerm, is never used under HA mode.
- private long term = 0;
+ // If running upon Ratis, holds term of underlying RaftServer iff current
+ // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
+ private long term;
SCMCommand() {
this.id = HddsIdFactory.getLongId();
diff --git
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
index c7a2293..410970e 100644
---
a/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
+++
b/hadoop-hdds/interface-server/src/main/proto/ScmServerDatanodeHeartbeatProtocol.proto
@@ -306,11 +306,9 @@ message SCMCommandProto {
optional ClosePipelineCommandProto closePipelineCommandProto = 8;
optional SetNodeOperationalStateCommandProto
setNodeOperationalStateCommandProto = 9;
- // Under HA mode, holds term of underlying RaftServer iff current
- // SCM is a leader, otherwise, holds term 0.
- // Notes that, the first elected leader is from term 1, term 0,
- // as the initial value of currentTerm, is never used under HA mode.
- optional uint64 term = 15;
+ // If running upon Ratis, holds term of underlying RaftServer iff current
+ // SCM is a leader. If running without Ratis, holds SCMContext.INVALID_TERM.
+ optional int64 term = 15;
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
index b215026..fb5d5d5 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/BlockManagerImpl.java
@@ -113,7 +113,7 @@ public class BlockManagerImpl implements BlockManager,
BlockmanagerMXBean {
blockDeletingService =
new SCMBlockDeletingService(deletedBlockLog, containerManager,
scm.getScmNodeManager(), scm.getEventQueue(), scm.getScmContext(),
- svcInterval, serviceTimeout, conf);
+ scm.getSCMServiceManager(), svcInterval, serviceTimeout, conf);
}
/**
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
index c3028a4..0a8e897 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/block/SCMBlockDeletingService.java
@@ -23,6 +23,8 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
@@ -32,6 +34,8 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -57,7 +61,8 @@ import org.slf4j.LoggerFactory;
* SCM HB thread polls cached commands and sends them to datanode for physical
* processing.
*/
-public class SCMBlockDeletingService extends BackgroundService {
+public class SCMBlockDeletingService extends BackgroundService
+ implements SCMService {
public static final Logger LOG =
LoggerFactory.getLogger(SCMBlockDeletingService.class);
@@ -71,11 +76,18 @@ public class SCMBlockDeletingService extends
BackgroundService {
private int blockDeleteLimitSize;
+ /**
+ * SCMService related variables.
+ */
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
@SuppressWarnings("parameternumber")
public SCMBlockDeletingService(DeletedBlockLog deletedBlockLog,
ContainerManagerV2 containerManager, NodeManager nodeManager,
EventPublisher eventPublisher, SCMContext scmContext,
- Duration interval, long serviceTimeout, ConfigurationSource conf) {
+ SCMServiceManager serviceManager, Duration interval, long serviceTimeout,
+ ConfigurationSource conf) {
super("SCMBlockDeletingService", interval.toMillis(),
TimeUnit.MILLISECONDS,
BLOCK_DELETING_SERVICE_CORE_POOL_SIZE, serviceTimeout);
this.deletedBlockLog = deletedBlockLog;
@@ -88,6 +100,9 @@ public class SCMBlockDeletingService extends
BackgroundService {
conf.getObject(ScmConfig.class).getBlockDeletionLimit();
Preconditions.checkArgument(blockDeleteLimitSize > 0,
"Block deletion limit should be " + "positive.");
+
+ // register SCMBlockDeletingService to SCMServiceManager
+ serviceManager.register(this);
}
@Override
@@ -121,6 +136,10 @@ public class SCMBlockDeletingService extends
BackgroundService {
@Override
public EmptyTaskResult call() throws Exception {
+ if (!shouldRun()) {
+ return EmptyTaskResult.newResult();
+ }
+
long startTime = Time.monotonicNow();
// Scan SCM DB in HB interval and collect a throttled list of
// to delete blocks.
@@ -153,7 +172,7 @@ public class SCMBlockDeletingService extends
BackgroundService {
// command is bigger than a limit, e.g 50. In case datanode goes
// offline for sometime, the cached commands be flooded.
SCMCommand<?> command = new DeleteBlocksCommand(dnTXs);
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
eventPublisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dnId, command));
if (LOG.isDebugEnabled()) {
@@ -200,4 +219,33 @@ public class SCMBlockDeletingService extends
BackgroundService {
public void setBlockDeleteTXNum(int numTXs) {
blockDeleteLimitSize = numTXs;
}
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ if (scmContext.isLeader()) {
+ serviceStatus = ServiceStatus.RUNNING;
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ return serviceStatus == ServiceStatus.RUNNING;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return SCMBlockDeletingService.class.getSimpleName();
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
index d71539d..d8d31ae 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/AbstractContainerReportHandler.java
@@ -327,7 +327,7 @@ public class AbstractContainerReportHandler {
SCMCommand<?> command = new DeleteContainerCommand(
containerID.getId(), true);
try {
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
logger.warn("Skip sending delete container command," +
" since not leader SCM", nle);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
index 3320d90..449252c 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/CloseContainerEventHandler.java
@@ -81,7 +81,7 @@ public class CloseContainerEventHandler implements
EventHandler<ContainerID> {
if (container.getState() == LifeCycleState.CLOSING) {
SCMCommand<?> command = new CloseContainerCommand(
containerID.getId(), container.getPipelineID());
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
getNodes(container).forEach(node ->
publisher.fireEvent(DATANODE_COMMAND,
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
index 0559c3c..ed439d0 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/container/ReplicationManager.java
@@ -32,13 +32,18 @@ import java.util.Set;
import java.util.StringJoiner;
import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
import java.util.function.Consumer;
import java.util.function.Predicate;
import java.util.stream.Collectors;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.Config;
import org.apache.hadoop.hdds.conf.ConfigGroup;
import org.apache.hadoop.hdds.conf.ConfigType;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -48,11 +53,11 @@ import org.apache.hadoop.hdds.scm.ContainerPlacementStatus;
import org.apache.hadoop.hdds.scm.PlacementPolicy;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.metrics2.MetricsCollector;
import org.apache.hadoop.metrics2.MetricsInfo;
@@ -84,8 +89,7 @@ import org.slf4j.LoggerFactory;
* that the containers are properly replicated. Replication Manager deals only
* with Quasi Closed / Closed container.
*/
-public class ReplicationManager
- implements MetricsSource, EventHandler<SafeModeStatus> {
+public class ReplicationManager implements MetricsSource, SCMService {
public static final Logger LOG =
LoggerFactory.getLogger(ReplicationManager.class);
@@ -138,7 +142,7 @@ public class ReplicationManager
/**
* ReplicationManager specific configuration.
*/
- private final ReplicationManagerConfiguration conf;
+ private final ReplicationManagerConfiguration rmConf;
/**
* ReplicationMonitor thread is the one which wakes up at configured
@@ -158,6 +162,16 @@ public class ReplicationManager
private int minHealthyForMaintenance;
/**
+ * SCMService related variables.
+ * After leaving safe mode, replicationMonitor needs to wait for a while
+ * before really take effect.
+ */
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+
+ /**
* Constructs ReplicationManager instance with the given configuration.
*
* @param conf OzoneConfiguration
@@ -165,11 +179,13 @@ public class ReplicationManager
* @param containerPlacement PlacementPolicy
* @param eventPublisher EventPublisher
*/
- public ReplicationManager(final ReplicationManagerConfiguration conf,
+ @SuppressWarnings("parameternumber")
+ public ReplicationManager(final ConfigurationSource conf,
final ContainerManagerV2 containerManager,
final PlacementPolicy containerPlacement,
final EventPublisher eventPublisher,
final SCMContext scmContext,
+ final SCMServiceManager serviceManager,
final LockManager<ContainerID> lockManager,
final NodeManager nodeManager) {
this.containerManager = containerManager;
@@ -178,11 +194,22 @@ public class ReplicationManager
this.scmContext = scmContext;
this.lockManager = lockManager;
this.nodeManager = nodeManager;
- this.conf = conf;
+ this.rmConf = conf.getObject(ReplicationManagerConfiguration.class);
this.running = false;
this.inflightReplication = new ConcurrentHashMap<>();
this.inflightDeletion = new ConcurrentHashMap<>();
- this.minHealthyForMaintenance = conf.getMaintenanceReplicaMinimum();
+ this.minHealthyForMaintenance = rmConf.getMaintenanceReplicaMinimum();
+
+ this.waitTimeInMillis = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ // register ReplicationManager to SCMServiceManager.
+ serviceManager.register(this);
+
+ // start ReplicationManager.
+ start();
}
/**
@@ -263,7 +290,7 @@ public class ReplicationManager
" processing {} containers.", Time.monotonicNow() - start,
containers.size());
- wait(conf.getInterval());
+ wait(rmConf.getInterval());
}
} catch (Throwable t) {
// When we get runtime exception, we should terminate SCM.
@@ -278,6 +305,10 @@ public class ReplicationManager
* @param container ContainerInfo
*/
private void processContainer(ContainerInfo container) {
+ if (!shouldRun()) {
+ return;
+ }
+
final ContainerID id = container.containerID();
lockManager.lock(id);
try {
@@ -419,7 +450,7 @@ public class ReplicationManager
final Map<ContainerID, List<InflightAction>> inflightActions,
final Predicate<InflightAction> filter) {
final ContainerID id = container.containerID();
- final long deadline = Time.monotonicNow() - conf.getEventTimeout();
+ final long deadline = Time.monotonicNow() - rmConf.getEventTimeout();
if (inflightActions.containsKey(id)) {
final List<InflightAction> actions = inflightActions.get(id);
@@ -971,7 +1002,7 @@ public class ReplicationManager
new CloseContainerCommand(container.getContainerID(),
container.getPipelineID(), force);
try {
- closeContainerCommand.setTerm(scmContext.getTerm());
+ closeContainerCommand.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending close container command,"
+ " since current SCM is not leader.", nle);
@@ -1043,7 +1074,7 @@ public class ReplicationManager
final SCMCommand<T> command,
final Consumer<InflightAction> tracker) {
try {
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending datanode command,"
+ " since current SCM is not leader.", nle);
@@ -1120,14 +1151,6 @@ public class ReplicationManager
.endRecord();
}
- @Override
- public void onMessage(SafeModeStatus status,
- EventPublisher publisher) {
- if (!status.isInSafeMode() && !this.isRunning()) {
- this.start();
- }
- }
-
/**
* Wrapper class to hold the InflightAction with its start time.
*/
@@ -1241,4 +1264,42 @@ public class ReplicationManager
.toString();
}
}
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ // 1) SCMContext#isLeader returns true.
+ // 2) not in safe mode.
+ if (scmContext.isLeader() && !scmContext.isInSafeMode()) {
+ // transition from PAUSING to RUNNING
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ LOG.info("Service {} transitions to RUNNING.", getServiceName());
+ lastTimeToBeReadyInMillis = Time.monotonicNow();
+ serviceStatus = ServiceStatus.RUNNING;
+ }
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING &&
+ Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis;
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return ReplicationManager.class.getSimpleName();
+ }
}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
index 6f6cc54..d01257b 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/events/SCMEvents.java
@@ -194,9 +194,6 @@ public final class SCMEvents {
public static final TypedEvent<SafeModeStatus> SAFE_MODE_STATUS =
new TypedEvent<>(SafeModeStatus.class, "Safe mode status");
- public static final TypedEvent<SafeModeStatus> DELAYED_SAFE_MODE_STATUS =
- new TypedEvent<>(SafeModeStatus.class, "Delayed safe mode status");
-
/**
* Private Ctor. Never Constructed.
*/
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
index 17dad7e..2d4941f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMContext.java
@@ -20,8 +20,6 @@ package org.apache.hadoop.hdds.scm.ha;
import com.google.common.base.Preconditions;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -32,14 +30,24 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
/**
* SCMContext is the single source of truth for some key information shared
* across all components within SCM, including:
- * - RaftServer related info, e.g., isLeader, term.
- * - SafeMode related info, e.g., inSafeMode, preCheckComplete.
+ * 1) RaftServer related info, e.g., isLeader, term.
+ * 2) SafeMode related info, e.g., inSafeMode, preCheckComplete.
+ *
+ * If current SCM is not running upon Ratis, the {@link SCMContext#isLeader}
+ * check will always return true, and {@link SCMContext#getTermOfLeader} will
+ * return INVALID_TERM.
*/
-public class SCMContext implements EventHandler<SafeModeStatus> {
+public final class SCMContext {
private static final Logger LOG = LoggerFactory.getLogger(SCMContext.class);
+ /**
+ * The initial value of term in raft is 0, and term increases monotonically.
+ * term equals INVALID_TERM indicates current SCM is running without Ratis.
+ */
+ public static final long INVALID_TERM = -1;
+
private static final SCMContext EMPTY_CONTEXT
- = new SCMContext(true, 0, new SafeModeStatus(false, true), null);
+ = new SCMContext.Builder().build();
/**
* Used by non-HA mode SCM, Recon and Unit Tests.
@@ -62,9 +70,8 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
private final StorageContainerManager scm;
private final ReadWriteLock lock = new ReentrantReadWriteLock();
- SCMContext(boolean isLeader, long term,
- final SafeModeStatus safeModeStatus,
- final StorageContainerManager scm) {
+ private SCMContext(boolean isLeader, long term,
+ final SafeModeStatus safeModeStatus, final StorageContainerManager scm) {
this.isLeader = isLeader;
this.term = term;
this.safeModeStatus = safeModeStatus;
@@ -72,25 +79,16 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
}
/**
- * Creates SCMContext instance from StorageContainerManager.
- */
- public SCMContext(final StorageContainerManager scm) {
- this(false, 0, new SafeModeStatus(true, false), scm);
- Preconditions.checkNotNull(scm, "scm is null");
- }
-
- /**
- *
- * @param newIsLeader : is leader or not
- * @param newTerm : term if current SCM becomes leader
+ * @param leader : is leader or not
+ * @param newTerm : term if current SCM becomes leader
*/
- public void updateIsLeaderAndTerm(boolean newIsLeader, long newTerm) {
+ public void updateLeaderAndTerm(boolean leader, long newTerm) {
lock.writeLock().lock();
try {
LOG.info("update <isLeader,term> from <{},{}> to <{},{}>",
- isLeader, term, newIsLeader, newTerm);
+ isLeader, term, leader, newTerm);
- isLeader = newIsLeader;
+ isLeader = leader;
term = newTerm;
} finally {
lock.writeLock().unlock();
@@ -105,6 +103,10 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
public boolean isLeader() {
lock.readLock().lock();
try {
+ if (term == INVALID_TERM) {
+ return true;
+ }
+
return isLeader;
} finally {
lock.readLock().unlock();
@@ -117,9 +119,13 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
* @return term
* @throws NotLeaderException if isLeader is false
*/
- public long getTerm() throws NotLeaderException {
+ public long getTermOfLeader() throws NotLeaderException {
lock.readLock().lock();
try {
+ if (term == INVALID_TERM) {
+ return term;
+ }
+
if (!isLeader) {
LOG.warn("getTerm is invoked when not leader.");
throw scm.getScmHAManager()
@@ -132,9 +138,10 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
}
}
- @Override
- public void onMessage(SafeModeStatus status,
- EventPublisher publisher) {
+ /**
+ * @param status : update SCMContext with latest SafeModeStatus.
+ */
+ public void updateSafeModeStatus(SafeModeStatus status) {
lock.writeLock().lock();
try {
LOG.info("Update SafeModeStatus from {} to {}.", safeModeStatus, status);
@@ -161,4 +168,57 @@ public class SCMContext implements
EventHandler<SafeModeStatus> {
lock.readLock().unlock();
}
}
+
+ /**
+ * @return StorageContainerManager
+ */
+ public StorageContainerManager getScm() {
+ Preconditions.checkNotNull(scm, "scm == null");
+ return scm;
+ }
+
+ public static class Builder {
+ /**
+ * The default context:
+ * running without Ratis, out of safe mode, and has completed preCheck.
+ */
+ private boolean isLeader = false;
+ private long term = INVALID_TERM;
+ private boolean isInSafeMode = false;
+ private boolean isPreCheckComplete = true;
+ private StorageContainerManager scm = null;
+
+ public Builder setLeader(boolean leader) {
+ this.isLeader = leader;
+ return this;
+ }
+
+ public Builder setTerm(long newTerm) {
+ this.term = newTerm;
+ return this;
+ }
+
+ public Builder setIsInSafeMode(boolean inSafeMode) {
+ this.isInSafeMode = inSafeMode;
+ return this;
+ }
+
+ public Builder setIsPreCheckComplete(boolean preCheckComplete) {
+ this.isPreCheckComplete = preCheckComplete;
+ return this;
+ }
+
+ public Builder setSCM(StorageContainerManager storageContainerManager) {
+ this.scm = storageContainerManager;
+ return this;
+ }
+
+ public SCMContext build() {
+ return new SCMContext(
+ isLeader,
+ term,
+ new SafeModeStatus(isInSafeMode, isPreCheckComplete),
+ scm);
+ }
+ }
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
new file mode 100644
index 0000000..32194a6
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMService.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+/**
+ * Interface for background services in SCM, including ReplicationManager,
+ * SCMBlockDeletingService and BackgroundPipelineCreator.
+ *
+ * Provide a fine-grained method to manipulate the status of these background
+ * services.
+ */
+public interface SCMService {
+ /**
+ * Notify raft or safe mode related status changed.
+ */
+ void notifyStatusChanged();
+
+ /**
+ * @param event latest triggered event.
+ */
+ default void notifyEventTriggered(Event event) {
+ }
+
+ /**
+ * @return true, if next iteration of Service should take effect,
+ * false, if next iteration of Service should be skipped.
+ */
+ boolean shouldRun();
+
+ /**
+ * @return name of the Service.
+ */
+ String getServiceName();
+
+ /**
+ * Status of Service.
+ */
+ enum ServiceStatus {
+ RUNNING,
+ PAUSING
+ }
+
+ /**
+ * One time event.
+ */
+ enum Event {
+ PRE_CHECK_COMPLETED,
+ NEW_NODE_HANDLER_TRIGGERED,
+ UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
new file mode 100644
index 0000000..52e3d26
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMServiceManager.java
@@ -0,0 +1,67 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with this
+ * work for additional information regarding copyright ownership. The ASF
+ * licenses this file to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ * WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ * License for the specific language governing permissions and limitations
under
+ * the License.
+ */
+package org.apache.hadoop.hdds.scm.ha;
+
+import com.google.common.base.Preconditions;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.hadoop.hdds.scm.ha.SCMService.*;
+
+import java.util.ArrayList;
+import java.util.List;
+
+/**
+ * Manipulate background services in SCM, including ReplicationManager,
+ * SCMBlockDeletingService and BackgroundPipelineCreator.
+ */
+public final class SCMServiceManager {
+ private static final Logger LOG =
+ LoggerFactory.getLogger(SCMServiceManager.class);
+
+ private final List<SCMService> services = new ArrayList<>();
+
+ /**
+ * Register a SCMService to SCMServiceManager.
+ */
+ public synchronized void register(SCMService service) {
+ Preconditions.checkNotNull(service);
+ LOG.info("Registering service {}.", service.getServiceName());
+ services.add(service);
+ }
+
+ /**
+ * Notify raft or safe mode related status changed.
+ */
+ public synchronized void notifyStatusChanged() {
+ for (SCMService service : services) {
+ LOG.debug("Notify service:{}.", service.getServiceName());
+ service.notifyStatusChanged();
+ }
+ }
+
+ /**
+ * Notify event triggered, which may affect SCMService.
+ */
+ public synchronized void notifyEventTriggered(Event event) {
+ for (SCMService service : services) {
+ LOG.debug("Notify service:{} with event:{.",
+ service.getServiceName(), event);
+ service.notifyEventTriggered(event);
+ }
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
index aa366e1..a04f0d8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/ha/SCMStateMachine.java
@@ -133,7 +133,9 @@ public class SCMStateMachine extends BaseStateMachine {
@Override
public void notifyNotLeader(Collection<TransactionContext> pendingEntries) {
LOG.info("current leader SCM steps down.");
- scm.getScmContext().updateIsLeaderAndTerm(false, 0);
+
+ scm.getScmContext().updateLeaderAndTerm(false, 0);
+ scm.getSCMServiceManager().notifyStatusChanged();
}
@Override
@@ -151,7 +153,9 @@ public class SCMStateMachine extends BaseStateMachine {
.getCurrentTerm();
LOG.info("current SCM becomes leader of term {}.", term);
- scm.getScmContext().updateIsLeaderAndTerm(true, term);
+
+ scm.getScmContext().updateLeaderAndTerm(true, term);
+ scm.getSCMServiceManager().notifyStatusChanged();
}
@Override
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
index c511b55..674cf2d 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NewNodeHandler.java
@@ -21,11 +21,12 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,32 +40,33 @@ public class NewNodeHandler implements
EventHandler<DatanodeDetails> {
private final PipelineManager pipelineManager;
private final NodeDecommissionManager decommissionManager;
private final ConfigurationSource conf;
+ private final SCMServiceManager serviceManager;
public NewNodeHandler(PipelineManager pipelineManager,
NodeDecommissionManager decommissionManager,
- ConfigurationSource conf) {
+ ConfigurationSource conf,
+ SCMServiceManager serviceManager) {
this.pipelineManager = pipelineManager;
this.decommissionManager = decommissionManager;
this.conf = conf;
+ this.serviceManager = serviceManager;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
try {
- pipelineManager.triggerPipelineCreation();
+ serviceManager.notifyEventTriggered(Event.NEW_NODE_HANDLER_TRIGGERED);
+
if (datanodeDetails.getPersistedOpState()
!= HddsProtos.NodeOperationalState.IN_SERVICE) {
decommissionManager.continueAdminForNode(datanodeDetails);
}
- }catch (NodeNotFoundException e) {
+ } catch (NodeNotFoundException e) {
// Should not happen, as the node has just registered to call this event
// handler.
LOG.warn("NodeNotFound when adding the node to the decommissionManager",
e);
- } catch (NotLeaderException nle) {
- LOG.debug("Not the current leader SCM and cannot start pipeline" +
- " creation.");
}
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
index 1cb6501..d74f90f 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/NonHealthyToHealthyNodeHandler.java
@@ -20,10 +20,10 @@ package org.apache.hadoop.hdds.scm.node;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.ratis.protocol.exceptions.NotLeaderException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -35,23 +35,19 @@ public class NonHealthyToHealthyNodeHandler
private static final Logger LOG =
LoggerFactory.getLogger(NonHealthyToHealthyNodeHandler.class);
- private final PipelineManager pipelineManager;
private final ConfigurationSource conf;
+ private final SCMServiceManager serviceManager;
public NonHealthyToHealthyNodeHandler(
- PipelineManager pipelineManager, OzoneConfiguration conf) {
- this.pipelineManager = pipelineManager;
+ OzoneConfiguration conf, SCMServiceManager serviceManager) {
this.conf = conf;
+ this.serviceManager = serviceManager;
}
@Override
public void onMessage(DatanodeDetails datanodeDetails,
EventPublisher publisher) {
- try {
- pipelineManager.triggerPipelineCreation();
- } catch (NotLeaderException ex) {
- LOG.debug("Not the current leader SCM and cannot start pipeline" +
- " creation.");
- }
+ serviceManager.notifyEventTriggered(
+ Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
index 4e6f53e..e2e50e8 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/node/SCMNodeManager.java
@@ -433,7 +433,7 @@ public class SCMNodeManager implements NodeManager {
Time.monotonicNow(),
scmStatus.getOperationalState(),
scmStatus.getOpStateExpiryEpochSeconds());
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
addDatanodeCommand(reportedDn.getUuid(), command);
} catch (NotLeaderException nle) {
LOG.warn("Skip sending SetNodeOperationalStateCommand,"
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
new file mode 100644
index 0000000..343444d
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/BackgroundPipelineCreatorV2.java
@@ -0,0 +1,303 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hadoop.hdds.scm.pipeline;
+
+import com.google.common.util.concurrent.ThreadFactoryBuilder;
+import org.apache.commons.collections.iterators.LoopingIterator;
+import org.apache.hadoop.hdds.HddsConfigKeys;
+import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
+import org.apache.hadoop.hdds.scm.ScmConfigKeys;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
+import org.apache.hadoop.ozone.OzoneConfigKeys;
+import org.apache.hadoop.util.Time;
+import org.apache.ratis.util.ExitUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import static
org.apache.hadoop.hdds.scm.ha.SCMService.Event.UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED;
+import static
org.apache.hadoop.hdds.scm.ha.SCMService.Event.NEW_NODE_HANDLER_TRIGGERED;
+import static
org.apache.hadoop.hdds.scm.ha.SCMService.Event.PRE_CHECK_COMPLETED;
+
+/**
+ * Implements api for running background pipeline creation jobs.
+ */
+public class BackgroundPipelineCreatorV2 implements SCMService {
+
+ private static final Logger LOG =
+ LoggerFactory.getLogger(BackgroundPipelineCreator.class);
+
+ private final PipelineManager pipelineManager;
+ private final ConfigurationSource conf;
+ private final SCMContext scmContext;
+
+ /**
+ * SCMService related variables.
+ * 1) after leaving safe mode, BackgroundPipelineCreator needs to
+ * wait for a while before really take effect.
+ * 2) NewNodeHandler, NonHealthyToHealthyNodeHandler, PreCheckComplete
+ * will trigger a one-shot run of BackgroundPipelineCreator,
+ * no matter in safe mode or not.
+ */
+ private final Lock serviceLock = new ReentrantLock();
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+ private final boolean createPipelineInSafeMode;
+ private final long waitTimeInMillis;
+ private long lastTimeToBeReadyInMillis = 0;
+ private boolean oneShotRun = false;
+
+ /**
+ * RatisPipelineUtilsThread is the one which wakes up at
+ * configured interval and tries to create pipelines.
+ */
+ private Thread thread;
+ private final Object monitor = new Object();
+ private static final String THREAD_NAME = "RatisPipelineUtilsThread";
+ private final AtomicBoolean running = new AtomicBoolean(false);
+ private final long intervalInMillis;
+
+
+ BackgroundPipelineCreatorV2(PipelineManager pipelineManager,
+ ConfigurationSource conf,
+ SCMServiceManager serviceManager,
+ SCMContext scmContext) {
+ this.pipelineManager = pipelineManager;
+ this.conf = conf;
+ this.scmContext = scmContext;
+
+ this.createPipelineInSafeMode = conf.getBoolean(
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
+ HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
+
+ this.waitTimeInMillis = conf.getTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ this.intervalInMillis = conf.getTimeDuration(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL_DEFAULT,
+ TimeUnit.MILLISECONDS);
+
+ // register BackgroundPipelineCreator to SCMServiceManager
+ serviceManager.register(this);
+
+ // start RatisPipelineUtilsThread
+ start();
+ }
+
+ /**
+ * Start RatisPipelineUtilsThread.
+ */
+ public void start() {
+ if (!running.compareAndSet(false, true)) {
+ LOG.warn("{} is already started, just ignore.", THREAD_NAME);
+ return;
+ }
+
+ LOG.info("Starting {}.", THREAD_NAME);
+
+ thread = new ThreadFactoryBuilder()
+ .setDaemon(false)
+ .setNameFormat(THREAD_NAME + " - %d")
+ .setUncaughtExceptionHandler((Thread t, Throwable ex) -> {
+ // gracefully shutdown SCM.
+ scmContext.getScm().stop();
+
+ String message = "Terminate SCM, encounter uncaught exception"
+ + " in RatisPipelineUtilsThread";
+ ExitUtils.terminate(1, message, ex, LOG);
+ })
+ .build()
+ .newThread(this::run);
+
+ thread.start();
+ }
+
+ /**
+ * Stop RatisPipelineUtilsThread.
+ */
+ public void stop() {
+ if (running.compareAndSet(true, false)) {
+ LOG.warn("{} is not running, just ignore.", THREAD_NAME);
+ return;
+ }
+
+ LOG.info("Stopping {}.", THREAD_NAME);
+
+ // in case RatisPipelineUtilsThread is sleeping
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+
+ try {
+ thread.join();
+ } catch (InterruptedException e) {
+ LOG.warn("Interrupted during join {}.", THREAD_NAME);
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ private void run() {
+ while (running.get()) {
+ if (shouldRun()) {
+ createPipelines();
+ }
+
+ try {
+ synchronized (monitor) {
+ monitor.wait(intervalInMillis);
+ }
+ } catch (InterruptedException e) {
+ LOG.warn("{} is interrupted.", THREAD_NAME);
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ private boolean skipCreation(HddsProtos.ReplicationFactor factor,
+ HddsProtos.ReplicationType type,
+ boolean autoCreate) {
+ if (type == HddsProtos.ReplicationType.RATIS) {
+ return factor == HddsProtos.ReplicationFactor.ONE && (!autoCreate);
+ } else {
+ // For STAND_ALONE Replication Type, Replication Factor 3 should not be
+ // used.
+ return factor == HddsProtos.ReplicationFactor.THREE;
+ }
+ }
+
+ private void createPipelines() throws RuntimeException {
+ // TODO: #CLUTIL Different replication factor may need to be supported
+ HddsProtos.ReplicationType type = HddsProtos.ReplicationType.valueOf(
+ conf.get(OzoneConfigKeys.OZONE_REPLICATION_TYPE,
+ OzoneConfigKeys.OZONE_REPLICATION_TYPE_DEFAULT));
+ boolean autoCreateFactorOne = conf.getBoolean(
+ ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE,
+ ScmConfigKeys.OZONE_SCM_PIPELINE_AUTO_CREATE_FACTOR_ONE_DEFAULT);
+
+ List<HddsProtos.ReplicationFactor> list =
+ new ArrayList<>();
+ for (HddsProtos.ReplicationFactor factor : HddsProtos.ReplicationFactor
+ .values()) {
+ if (skipCreation(factor, type, autoCreateFactorOne)) {
+ // Skip this iteration for creating pipeline
+ continue;
+ }
+ list.add(factor);
+ if (!pipelineManager.getSafeModeStatus()) {
+ try {
+ pipelineManager.scrubPipeline(type, factor);
+ } catch (IOException e) {
+ LOG.error("Error while scrubbing pipelines.", e);
+ }
+ }
+ }
+
+ LoopingIterator it = new LoopingIterator(list);
+ while (it.hasNext()) {
+ HddsProtos.ReplicationFactor factor =
+ (HddsProtos.ReplicationFactor) it.next();
+
+ try {
+ pipelineManager.createPipeline(type, factor);
+ } catch (IOException ioe) {
+ it.remove();
+ } catch (Throwable t) {
+ LOG.error("Error while creating pipelines", t);
+ it.remove();
+ }
+ }
+
+ LOG.debug("BackgroundPipelineCreator createPipelines finished.");
+ }
+
+ @Override
+ public void notifyStatusChanged() {
+ serviceLock.lock();
+ try {
+ // 1) SCMContext#isLeader returns true.
+ // 2) not in safe mode or createPipelineInSafeMode is true
+ if (scmContext.isLeader() &&
+ (!scmContext.isInSafeMode() || createPipelineInSafeMode)) {
+ // transition from PAUSING to RUNNING
+ if (serviceStatus != ServiceStatus.RUNNING) {
+ LOG.info("Service {} transitions to RUNNING.", getServiceName());
+ lastTimeToBeReadyInMillis = Time.monotonicNow();
+ serviceStatus = ServiceStatus.RUNNING;
+ }
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public void notifyEventTriggered(Event event) {
+ if (!scmContext.isLeader()) {
+ LOG.info("ignore, not leader SCM.");
+ return;
+ }
+ if (event == NEW_NODE_HANDLER_TRIGGERED
+ || event == UNHEALTHY_TO_HEALTHY_NODE_HANDLER_TRIGGERED
+ || event == PRE_CHECK_COMPLETED) {
+ LOG.info("trigger a one-shot run on {}.", THREAD_NAME);
+ oneShotRun = true;
+
+ synchronized (monitor) {
+ monitor.notifyAll();
+ }
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ serviceLock.lock();
+ try {
+ // check one-short run
+ if (oneShotRun) {
+ oneShotRun = false;
+ return true;
+ }
+
+ // If safe mode is off, then this SCMService starts to run with a delay.
+ return serviceStatus == ServiceStatus.RUNNING && (
+ createPipelineInSafeMode ||
+ Time.monotonicNow() - lastTimeToBeReadyInMillis >= waitTimeInMillis);
+ } finally {
+ serviceLock.unlock();
+ }
+ }
+
+ @Override
+ public String getServiceName() {
+ return BackgroundPipelineCreator.class.getSimpleName();
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
index 89d2833..e33f256 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineActionHandler.java
@@ -94,7 +94,7 @@ public class PipelineActionHandler
"firing close pipeline event.", action, pid);
SCMCommand<?> command = new ClosePipelineCommand(pid);
try {
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
} catch (NotLeaderException nle) {
LOG.warn("Skip sending ClosePipelineCommand for pipeline {}," +
" since not leader SCM.", pid);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
index 9f714da..2078460 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManager.java
@@ -28,15 +28,12 @@ import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.ratis.protocol.exceptions.NotLeaderException;
/**
* Interface which exposes the api for pipeline management.
*/
-public interface PipelineManager extends Closeable, PipelineManagerMXBean,
- EventHandler<SafeModeStatus> {
+public interface PipelineManager extends Closeable, PipelineManagerMXBean {
Pipeline createPipeline(ReplicationType type, ReplicationFactor factor)
throws IOException;
@@ -85,7 +82,7 @@ public interface PipelineManager extends Closeable,
PipelineManagerMXBean,
void startPipelineCreator();
- void triggerPipelineCreation() throws NotLeaderException;
+ void triggerPipelineCreation();
void incNumBlocksAllocatedMetric(PipelineID id);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
index 3c88174..d0a4c96 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
@@ -30,10 +30,9 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
import org.apache.hadoop.metrics2.util.MBeans;
import org.apache.hadoop.util.Time;
@@ -52,7 +51,6 @@ import java.util.Map;
import java.util.NavigableSet;
import java.util.Set;
import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
@@ -69,27 +67,24 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
private final Lock lock;
private PipelineFactory pipelineFactory;
private StateManager stateManager;
- private Scheduler scheduler;
- private BackgroundPipelineCreator backgroundPipelineCreator;
+ private BackgroundPipelineCreatorV2 backgroundPipelineCreator;
private final ConfigurationSource conf;
private final EventPublisher eventPublisher;
// Pipeline Manager MXBean
private ObjectName pmInfoBean;
private final SCMPipelineMetrics metrics;
- private long pipelineWaitDefaultTimeout;
- private final AtomicBoolean isInSafeMode;
- private SCMHAManager scmhaManager;
- private NodeManager nodeManager;
- // Used to track if the safemode pre-checks have completed. This is designed
- // to prevent pipelines being created until sufficient nodes have registered.
- private final AtomicBoolean pipelineCreationAllowed;
+ private final long pipelineWaitDefaultTimeout;
+ private final SCMHAManager scmhaManager;
+ private final SCMContext scmContext;
+ private final NodeManager nodeManager;
private PipelineManagerV2Impl(ConfigurationSource conf,
SCMHAManager scmhaManager,
NodeManager nodeManager,
StateManager pipelineStateManager,
PipelineFactory pipelineFactory,
- EventPublisher eventPublisher) {
+ EventPublisher eventPublisher,
+ SCMContext scmContext) {
this.lock = new ReentrantLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
@@ -97,6 +92,7 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
this.scmhaManager = scmhaManager;
this.nodeManager = nodeManager;
this.eventPublisher = eventPublisher;
+ this.scmContext = scmContext;
this.pmInfoBean = MBeans.register("SCMPipelineManager",
"SCMPipelineManagerInfo", this);
this.metrics = SCMPipelineMetrics.create();
@@ -104,12 +100,6 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
TimeUnit.MILLISECONDS);
- this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
- HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
- HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
- // Pipeline creation is only allowed after the safemode prechecks have
- // passed, eg sufficient nodes have registered.
- this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
}
public static PipelineManagerV2Impl newPipelineManager(
@@ -118,7 +108,8 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
NodeManager nodeManager,
Table<PipelineID, Pipeline> pipelineStore,
EventPublisher eventPublisher,
- SCMContext scmContext) throws IOException {
+ SCMContext scmContext,
+ SCMServiceManager serviceManager) throws IOException {
// Create PipelineStateManager
StateManager stateManager = PipelineStateManagerV2Impl
.newBuilder().setPipelineStore(pipelineStore)
@@ -130,18 +121,18 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
// Create PipelineFactory
PipelineFactory pipelineFactory = new PipelineFactory(
nodeManager, stateManager, conf, eventPublisher, scmContext);
+
// Create PipelineManager
PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
scmhaManager, nodeManager, stateManager, pipelineFactory,
- eventPublisher);
+ eventPublisher, scmContext);
// Create background thread.
- Scheduler scheduler = new Scheduler(
- "RatisPipelineUtilsThread", false, 1);
- BackgroundPipelineCreator backgroundPipelineCreator =
- new BackgroundPipelineCreator(pipelineManager, scheduler, conf);
+ BackgroundPipelineCreatorV2 backgroundPipelineCreator =
+ new BackgroundPipelineCreatorV2(
+ pipelineManager, conf, serviceManager, scmContext);
+
pipelineManager.setBackgroundPipelineCreator(backgroundPipelineCreator);
- pipelineManager.setScheduler(scheduler);
return pipelineManager;
}
@@ -385,17 +376,15 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
*/
@Override
public void startPipelineCreator() {
- backgroundPipelineCreator.startFixedIntervalPipelineCreator();
+ throw new RuntimeException("Not supported in HA code.");
}
/**
* Triggers pipeline creation after the specified time.
*/
@Override
- public void triggerPipelineCreation() throws NotLeaderException {
- // TODO add checkLeader once follower validates safemode
- // before it becomes leader.
- backgroundPipelineCreator.triggerPipelineCreation();
+ public void triggerPipelineCreation() {
+ throw new RuntimeException("Not supported in HA code.");
}
@Override
@@ -497,14 +486,13 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
*/
@Override
public boolean getSafeModeStatus() {
- return this.isInSafeMode.get();
+ return scmContext.isInSafeMode();
}
@Override
public void close() throws IOException {
- if (scheduler != null) {
- scheduler.close();
- scheduler = null;
+ if (backgroundPipelineCreator != null) {
+ backgroundPipelineCreator.stop();
}
if(pmInfoBean != null) {
@@ -523,40 +511,9 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
}
}
- @Override
- public void onMessage(SCMSafeModeManager.SafeModeStatus status,
- EventPublisher publisher) {
- // TODO: #CLUTIL - handle safemode getting re-enabled
- boolean currentAllowPipelines =
- pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
- boolean currentlyInSafeMode =
- isInSafeMode.getAndSet(status.isInSafeMode());
-
- // Trigger pipeline creation only if the preCheck status has changed to
- // complete.
-
- try {
- if (isPipelineCreationAllowed() && !currentAllowPipelines) {
- triggerPipelineCreation();
- }
- // Start the pipeline creation thread only when safemode switches off
- if (!getSafeModeStatus() && currentlyInSafeMode) {
- startPipelineCreator();
- }
- } catch (NotLeaderException ex) {
- LOG.warn("Not leader SCM, cannot process pipeline creation.");
- }
-
- }
-
@VisibleForTesting
public boolean isPipelineCreationAllowed() {
- return pipelineCreationAllowed.get();
- }
-
- @VisibleForTesting
- public void allowPipelineCreation() {
- this.pipelineCreationAllowed.set(true);
+ return scmContext.isLeader() && scmContext.isPreCheckComplete();
}
@VisibleForTesting
@@ -576,12 +533,13 @@ public final class PipelineManagerV2Impl implements
PipelineManager {
}
private void setBackgroundPipelineCreator(
- BackgroundPipelineCreator backgroundPipelineCreator) {
+ BackgroundPipelineCreatorV2 backgroundPipelineCreator) {
this.backgroundPipelineCreator = backgroundPipelineCreator;
}
- private void setScheduler(Scheduler scheduler) {
- this.scheduler = scheduler;
+ @VisibleForTesting
+ public BackgroundPipelineCreatorV2 getBackgroundPipelineCreator() {
+ return this.backgroundPipelineCreator;
}
private void recordMetricsForPipeline(Pipeline pipeline) {
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
index ca51433..8fc7f3e 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineReportHandler.java
@@ -103,7 +103,7 @@ public class PipelineReportHandler implements
pipeline = pipelineManager.getPipeline(pipelineID);
} catch (PipelineNotFoundException e) {
SCMCommand<?> command = new ClosePipelineCommand(pipelineID);
- command.setTerm(scmContext.getTerm());
+ command.setTerm(scmContext.getTermOfLeader());
publisher.fireEvent(SCMEvents.DATANODE_COMMAND,
new CommandForDatanode<>(dn.getUuid(), command));
return;
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
index ede3b1e..e485a28 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/RatisPipelineProvider.java
@@ -163,7 +163,7 @@ public class RatisPipelineProvider extends PipelineProvider
{
new CreatePipelineCommand(pipeline.getId(), pipeline.getType(),
factor, dns);
- createCommand.setTerm(scmContext.getTerm());
+ createCommand.setTerm(scmContext.getTermOfLeader());
dns.forEach(node -> {
LOG.info("Sending CreatePipelineCommand for pipeline:{} to datanode:{}",
@@ -201,7 +201,7 @@ public class RatisPipelineProvider extends PipelineProvider
{
public void close(Pipeline pipeline) throws NotLeaderException {
final ClosePipelineCommand closeCommand =
new ClosePipelineCommand(pipeline.getId());
- closeCommand.setTerm(scmContext.getTerm());
+ closeCommand.setTerm(scmContext.getTermOfLeader());
pipeline.getNodes().forEach(node -> {
final CommandForDatanode<?> datanodeCommand =
new CommandForDatanode<>(node.getUuid(), closeCommand);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 892e3bc..a1cef22 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@ -45,6 +45,7 @@ import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
+import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.utils.Scheduler;
import org.apache.hadoop.hdds.utils.db.Table;
@@ -64,7 +65,8 @@ import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAI
* for pipelines must come via PipelineManager. It synchronises all write
* and read operations via a ReadWriteLock.
*/
-public class SCMPipelineManager implements PipelineManager {
+public class SCMPipelineManager implements
+ PipelineManager, EventHandler<SafeModeStatus> {
private static final Logger LOG =
LoggerFactory.getLogger(SCMPipelineManager.class);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
index 26fb806..e4e069a 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/safemode/SCMSafeModeManager.java
@@ -22,7 +22,6 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang3.tuple.Pair;
@@ -30,6 +29,9 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.ConfigurationSource;
import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
import org.apache.hadoop.hdds.server.events.EventQueue;
@@ -83,7 +85,6 @@ public class SCMSafeModeManager implements SafeModeManager {
private static final Logger LOG =
LoggerFactory.getLogger(SCMSafeModeManager.class);
private final boolean isSafeModeEnabled;
- private final long waitTime;
private AtomicBoolean inSafeMode = new AtomicBoolean(true);
private AtomicBoolean preCheckComplete = new AtomicBoolean(false);
@@ -102,25 +103,24 @@ public class SCMSafeModeManager implements
SafeModeManager {
private final EventQueue eventPublisher;
private final PipelineManager pipelineManager;
+ private final SCMServiceManager serviceManager;
+ private final SCMContext scmContext;
private final SafeModeMetrics safeModeMetrics;
public SCMSafeModeManager(ConfigurationSource conf,
List<ContainerInfo> allContainers, PipelineManager pipelineManager,
- EventQueue eventQueue) {
+ EventQueue eventQueue, SCMServiceManager serviceManager,
+ SCMContext scmContext) {
this.config = conf;
this.pipelineManager = pipelineManager;
this.eventPublisher = eventQueue;
+ this.serviceManager = serviceManager;
+ this.scmContext = scmContext;
this.isSafeModeEnabled = conf.getBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT);
-
- this.waitTime = conf.getTimeDuration(
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT_DEFAULT,
- TimeUnit.MILLISECONDS);
-
if (isSafeModeEnabled) {
this.safeModeMetrics = SafeModeMetrics.create();
ContainerSafeModeRule containerSafeModeRule =
@@ -147,13 +147,6 @@ public class SCMSafeModeManager implements SafeModeManager
{
exitRules.put(ATLEAST_ONE_DATANODE_REPORTED_PIPELINE_EXIT_RULE,
oneReplicaPipelineSafeModeRule);
}
- boolean createPipelineInSafemode = conf.getBoolean(
- HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
- HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION_DEFAULT);
-
- if (createPipelineInSafemode) {
- pipelineManager.startPipelineCreator();
- }
} else {
this.safeModeMetrics = null;
exitSafeMode(eventQueue);
@@ -177,28 +170,22 @@ public class SCMSafeModeManager implements
SafeModeManager {
public void emitSafeModeStatus() {
SafeModeStatus safeModeStatus =
new SafeModeStatus(getInSafeMode(), getPreCheckComplete());
+ // TODO: remove eventPublisher,
+ // since there will no consumer of SAFE_MODE_STATUS in future.
eventPublisher.fireEvent(SCMEvents.SAFE_MODE_STATUS,
safeModeStatus);
- // Only notify the delayed listeners if safemode remains on, as precheck
- // may have completed.
- if (safeModeStatus.isInSafeMode()) {
- eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS,
- safeModeStatus);
- } else {
+ // update SCMContext
+ scmContext.updateSafeModeStatus(safeModeStatus);
+
+ // notify SCMServiceManager
+ if (!safeModeStatus.isInSafeMode()) {
// If safemode is off, then notify the delayed listeners with a delay.
- final Thread safeModeExitThread = new Thread(() -> {
- try {
- Thread.sleep(waitTime);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- eventPublisher.fireEvent(SCMEvents.DELAYED_SAFE_MODE_STATUS,
- safeModeStatus);
- });
-
- safeModeExitThread.setDaemon(true);
- safeModeExitThread.start();
+ serviceManager.notifyStatusChanged();
+ } else if (safeModeStatus.isPreCheckComplete()) {
+ // Only notify the delayed listeners if safemode remains on, as precheck
+ // may have completed.
+ serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
}
}
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index 6782f52..809dda9 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -55,6 +55,7 @@ import
org.apache.hadoop.hdds.scm.container.ContainerManagerV2;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.ha.SCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMHAManagerImpl;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.utils.HddsServerUtil;
import org.apache.hadoop.hdds.scm.ScmConfig;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
@@ -70,7 +71,6 @@ import org.apache.hadoop.hdds.scm.container.ContainerInfo;
import org.apache.hadoop.hdds.scm.container.ContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.IncrementalContainerReportHandler;
import org.apache.hadoop.hdds.scm.container.ReplicationManager;
-import
org.apache.hadoop.hdds.scm.container.ReplicationManager.ReplicationManagerConfiguration;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacementPolicyFactory;
import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPlacementMetrics;
import org.apache.hadoop.hdds.scm.container.placement.metrics.ContainerStat;
@@ -174,6 +174,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
private SCMContext scmContext;
private final EventQueue eventQueue;
+ private final SCMServiceManager serviceManager;
+
/*
* HTTP endpoint for JMX access.
*/
@@ -284,6 +286,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
}
eventQueue = new EventQueue();
+ serviceManager = new SCMServiceManager();
+
long watcherTimeout =
conf.getTimeDuration(ScmConfigKeys.HDDS_SCM_WATCHER_TIMEOUT,
HDDS_SCM_WATCHER_TIMEOUT_DEFAULT, TimeUnit.MILLISECONDS);
@@ -303,7 +307,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
new CommandStatusReportHandler();
NewNodeHandler newNodeHandler = new NewNodeHandler(pipelineManager,
- scmDecommissionManager, conf);
+ scmDecommissionManager, conf, serviceManager);
StaleNodeHandler staleNodeHandler =
new StaleNodeHandler(scmNodeManager, pipelineManager, conf);
DeadNodeHandler deadNodeHandler = new DeadNodeHandler(scmNodeManager,
@@ -311,7 +315,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
StartDatanodeAdminHandler datanodeStartAdminHandler =
new StartDatanodeAdminHandler(scmNodeManager, pipelineManager);
NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler =
- new NonHealthyToHealthyNodeHandler(pipelineManager, conf);
+ new NonHealthyToHealthyNodeHandler(conf, serviceManager);
ContainerActionsHandler actionsHandler = new ContainerActionsHandler();
PendingDeleteHandler pendingDeleteHandler =
new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService());
@@ -361,14 +365,6 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
(DeletedBlockLogImpl) scmBlockManager.getDeletedBlockLog());
eventQueue.addHandler(SCMEvents.PIPELINE_ACTIONS, pipelineActionHandler);
eventQueue.addHandler(SCMEvents.PIPELINE_REPORT, pipelineReportHandler);
- eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS, scmContext);
- // TODO:
- // handle replicationManager and pipelineManager in ServiceManager
- eventQueue
- .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, replicationManager);
- eventQueue
- .addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS, pipelineManager);
-
// Emit initial safe mode status, as now handlers are registered.
scmSafeModeManager.emitSafeModeStatus();
@@ -436,7 +432,14 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
if (configurator.getScmContext() != null) {
scmContext = configurator.getScmContext();
} else {
- scmContext = new SCMContext(this);
+ // non-leader of term 0, in safe mode, preCheck not completed.
+ scmContext = new SCMContext.Builder()
+ .setLeader(false)
+ .setTerm(0)
+ .setIsInSafeMode(true)
+ .setIsPreCheckComplete(false)
+ .setSCM(this)
+ .build();
}
if(configurator.getScmNodeManager() != null) {
@@ -461,7 +464,8 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
scmNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- scmContext);
+ scmContext,
+ serviceManager);
}
if (configurator.getContainerManager() != null) {
@@ -481,11 +485,12 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
replicationManager = configurator.getReplicationManager();
} else {
replicationManager = new ReplicationManager(
- conf.getObject(ReplicationManagerConfiguration.class),
+ conf,
containerManager,
containerPlacementPolicy,
eventQueue,
scmContext,
+ serviceManager,
new LockManager<>(conf),
scmNodeManager);
}
@@ -494,7 +499,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
} else {
scmSafeModeManager = new SCMSafeModeManager(conf,
containerManager.getContainers(),
- pipelineManager, eventQueue);
+ pipelineManager, eventQueue, serviceManager, scmContext);
}
scmDecommissionManager = new NodeDecommissionManager(conf, scmNodeManager,
containerManager, eventQueue, replicationManager);
@@ -1166,6 +1171,13 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
}
/**
+ * Returns SCMServiceManager.
+ */
+ public SCMServiceManager getSCMServiceManager() {
+ return serviceManager;
+ }
+
+ /**
* Force SCM out of safe mode.
*/
public boolean exitSafeMode() {
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index c8c8243..a2a13e3 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMCommandProto;
import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.container.CloseContainerEventHandler;
import org.apache.hadoop.hdds.scm.container.ContainerID;
@@ -91,6 +92,7 @@ public class TestBlockManager {
private static HddsProtos.ReplicationType type;
private EventQueue eventQueue;
private SCMContext scmContext;
+ private SCMServiceManager serviceManager;
private int numContainerPerOwnerInPipeline;
private OzoneConfiguration conf;
@@ -121,6 +123,7 @@ public class TestBlockManager {
eventQueue = new EventQueue();
scmContext = SCMContext.emptyContext();
+ serviceManager = new SCMServiceManager();
scmMetadataStore = new SCMMetadataStoreImpl(conf);
scmMetadataStore.start(conf);
@@ -131,8 +134,8 @@ public class TestBlockManager {
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- scmContext);
- pipelineManager.allowPipelineCreation();
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -146,7 +149,7 @@ public class TestBlockManager {
scmMetadataStore.getContainerTable());
SCMSafeModeManager safeModeManager = new SCMSafeModeManager(conf,
containerManager.getContainers(),
- pipelineManager, eventQueue) {
+ pipelineManager, eventQueue, serviceManager, scmContext) {
@Override
public void emitSafeModeStatus() {
// skip
@@ -173,8 +176,7 @@ public class TestBlockManager {
factor = HddsProtos.ReplicationFactor.THREE;
type = HddsProtos.ReplicationType.RATIS;
- scm.getScmContext().onMessage(
- new SafeModeStatus(false, true), null);
+ scm.getScmContext().updateSafeModeStatus(new SafeModeStatus(false, true));
}
@After
@@ -460,8 +462,8 @@ public class TestBlockManager {
@Test
public void testAllocateBlockFailureInSafeMode() throws Exception {
- scm.getScmContext().onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ scm.getScmContext().updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, true));
// Test1: In safe mode expect an SCMException.
thrown.expectMessage("SafeModePrecheck failed for "
+ "allocateBlock");
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 6d5fee2..d42db5d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -30,6 +30,8 @@ import org.apache.hadoop.hdds.scm.ScmConfigKeys;
import org.apache.hadoop.hdds.scm.TestUtils;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -80,6 +82,8 @@ public class TestCloseContainerEventHandler {
scmContext = SCMContext.emptyContext();
scmMetadataStore = new SCMMetadataStoreImpl(configuration);
+ SCMServiceManager serviceManager = new SCMServiceManager();
+
pipelineManager =
PipelineManagerV2Impl.newPipelineManager(
configuration,
@@ -87,9 +91,9 @@ public class TestCloseContainerEventHandler {
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- scmContext);
+ scmContext,
+ serviceManager);
- pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), configuration, eventQueue);
@@ -99,7 +103,10 @@ public class TestCloseContainerEventHandler {
MockSCMHAManager.getInstance(true),
pipelineManager,
scmMetadataStore.getContainerTable());
- pipelineManager.triggerPipelineCreation();
+
+ // trigger BackgroundPipelineCreator to take effect.
+ serviceManager.notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
+
eventQueue.addHandler(CLOSE_CONTAINER,
new CloseContainerEventHandler(
pipelineManager, containerManager, scmContext));
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
index d926a02..6e2e3ce 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestReplicationManager.java
@@ -19,7 +19,7 @@
package org.apache.hadoop.hdds.scm.container;
import com.google.common.primitives.Longs;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
+import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.DatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState;
@@ -34,6 +34,7 @@ import
org.apache.hadoop.hdds.scm.container.placement.algorithms.ContainerPlacem
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.node.SCMNodeManager;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
@@ -58,6 +59,7 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.function.Function;
@@ -88,13 +90,17 @@ public class TestReplicationManager {
private DatanodeCommandHandler datanodeCommandHandler;
private SimpleMockNodeManager nodeManager;
private ContainerManagerV2 containerManager;
- private ConfigurationSource conf;
+ private OzoneConfiguration conf;
private SCMNodeManager scmNodeManager;
@Before
public void setup()
throws IOException, InterruptedException, NodeNotFoundException {
conf = new OzoneConfiguration();
+ conf.setTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ 0, TimeUnit.SECONDS);
+
containerManager = Mockito.mock(ContainerManagerV2.class);
nodeManager = new SimpleMockNodeManager();
eventQueue = new EventQueue();
@@ -147,30 +153,43 @@ public class TestReplicationManager {
Mockito.any(DatanodeDetails.class)))
.thenReturn(NodeStatus.inServiceHealthy());
+ SCMServiceManager serviceManager = new SCMServiceManager();
+
replicationManager = new ReplicationManager(
- new ReplicationManagerConfiguration(),
+ conf,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
+ serviceManager,
new LockManager<>(conf),
nodeManager);
- replicationManager.start();
+
+ serviceManager.notifyStatusChanged();
Thread.sleep(100L);
}
private void createReplicationManager(ReplicationManagerConfiguration rmConf)
throws InterruptedException {
+ OzoneConfiguration config = new OzoneConfiguration();
+ config.setTimeDuration(
+ HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
+ 0, TimeUnit.SECONDS);
+ config.setFromObject(rmConf);
+
+ SCMServiceManager serviceManager = new SCMServiceManager();
+
replicationManager = new ReplicationManager(
- rmConf,
+ config,
containerManager,
containerPlacementPolicy,
eventQueue,
SCMContext.emptyContext(),
- new LockManager<ContainerID>(conf),
+ serviceManager,
+ new LockManager<ContainerID>(config),
nodeManager);
- replicationManager.start();
+ serviceManager.notifyStatusChanged();
Thread.sleep(100L);
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index ba0cba5..8f9bc5d 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -44,6 +44,7 @@ import
org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolPro
import org.apache.hadoop.hdds.scm.XceiverClientManager;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
@@ -100,8 +101,8 @@ public class TestSCMContainerManager {
nodeManager,
scmMetadataStore.getPipelineTable(),
new EventQueue(),
- SCMContext.emptyContext());
- pipelineManager.allowPipelineCreation();
+ SCMContext.emptyContext(),
+ new SCMServiceManager());
containerManager = new SCMContainerManager(conf,
scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
index a8e4c00..c809880 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMContext.java
@@ -34,33 +34,43 @@ public class TestSCMContext {
@Test
public void testRaftOperations() {
// start as follower
- SCMContext scmContext = new SCMContext(false, 0, null, null);
+ SCMContext scmContext =
+ new SCMContext.Builder().setLeader(false).setTerm(0).build();
+
assertFalse(scmContext.isLeader());
// become leader
- scmContext.updateIsLeaderAndTerm(true, 10);
+ scmContext.updateLeaderAndTerm(true, 10);
assertTrue(scmContext.isLeader());
try {
- assertEquals(scmContext.getTerm(), 10);
+ assertEquals(scmContext.getTermOfLeader(), 10);
} catch (NotLeaderException e) {
fail("Should not throw nle.");
}
// step down
- scmContext.updateIsLeaderAndTerm(false, 0);
+ scmContext.updateLeaderAndTerm(false, 0);
assertFalse(scmContext.isLeader());
}
@Test
public void testSafeModeOperations() {
// in safe mode
- SCMContext scmContext = new SCMContext(
- true, 0, new SafeModeStatus(true, false), null);
+ SCMContext scmContext = new SCMContext.Builder()
+ .setIsInSafeMode(true)
+ .setIsPreCheckComplete(false)
+ .build();
+
assertTrue(scmContext.isInSafeMode());
assertFalse(scmContext.isPreCheckComplete());
+ // in safe mode, pass preCheck
+ scmContext.updateSafeModeStatus(new SafeModeStatus(true, true));
+ assertTrue(scmContext.isInSafeMode());
+ assertTrue(scmContext.isPreCheckComplete());
+
// out of safe mode
- scmContext.onMessage(new SafeModeStatus(false, true), null);
+ scmContext.updateSafeModeStatus(new SafeModeStatus(false, true));
assertFalse(scmContext.isInSafeMode());
assertTrue(scmContext.isPreCheckComplete());
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
new file mode 100644
index 0000000..47b4537
--- /dev/null
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/ha/TestSCMServiceManager.java
@@ -0,0 +1,151 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.hadoop.hdds.scm.ha;
+
+import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
+import org.junit.Test;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+
+public class TestSCMServiceManager {
+ @Test
+ public void testServiceRunWhenLeader() {
+ SCMContext scmContext = new SCMContext.Builder()
+ .setLeader(false)
+ .setTerm(1)
+ .setIsInSafeMode(true)
+ .setIsPreCheckComplete(false)
+ .build();
+
+ // A service runs when it is leader.
+ SCMService serviceRunWhenLeader = new SCMService() {
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+ @Override
+ public void notifyStatusChanged() {
+ if (scmContext.isLeader()) {
+ serviceStatus = ServiceStatus.RUNNING;
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ return serviceStatus == ServiceStatus.RUNNING;
+ }
+
+ @Override
+ public String getServiceName() {
+ return "serviceRunWhenLeader";
+ }
+ };
+
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ serviceManager.register(serviceRunWhenLeader);
+
+ // PAUSING at the beginning.
+ assertFalse(serviceRunWhenLeader.shouldRun());
+
+ // PAUSING when out of safe mode.
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(false, true));
+ serviceManager.notifyStatusChanged();
+ assertFalse(serviceRunWhenLeader.shouldRun());
+
+ // RUNNING when becoming leader.
+ scmContext.updateLeaderAndTerm(true, 2);
+ serviceManager.notifyStatusChanged();
+ assertTrue(serviceRunWhenLeader.shouldRun());
+
+ // RUNNING when in safe mode.
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, false));
+ serviceManager.notifyStatusChanged();
+ assertTrue(serviceRunWhenLeader.shouldRun());
+
+ // PAUSING when stepping down.
+ scmContext.updateLeaderAndTerm(false, 3);
+ serviceManager.notifyStatusChanged();
+ assertFalse(serviceRunWhenLeader.shouldRun());
+ }
+
+ @Test
+ public void setServiceRunWhenLeaderAndOutOfSafeMode() {
+ SCMContext scmContext = new SCMContext.Builder()
+ .setLeader(false)
+ .setTerm(1)
+ .setIsInSafeMode(true)
+ .setIsPreCheckComplete(false)
+ .build();
+
+ // A service runs when it is leader and out of safe mode.
+ SCMService serviceRunWhenLeaderAndOutOfSafeMode = new SCMService() {
+ private ServiceStatus serviceStatus = ServiceStatus.PAUSING;
+
+ @Override
+ public void notifyStatusChanged() {
+ if (scmContext.isLeader() && !scmContext.isInSafeMode()) {
+ serviceStatus = ServiceStatus.RUNNING;
+ } else {
+ serviceStatus = ServiceStatus.PAUSING;
+ }
+ }
+
+ @Override
+ public boolean shouldRun() {
+ return serviceStatus == ServiceStatus.RUNNING;
+ }
+
+ @Override
+ public String getServiceName() {
+ return "serviceRunWhenLeaderAndOutOfSafeMode";
+ }
+ };
+
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ serviceManager.register(serviceRunWhenLeaderAndOutOfSafeMode);
+
+ // PAUSING at the beginning.
+ assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+ // PAUSING when out of safe mode.
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(false, true));
+ serviceManager.notifyStatusChanged();
+ assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+ // RUNNING when becoming leader.
+ scmContext.updateLeaderAndTerm(true, 2);
+ serviceManager.notifyStatusChanged();
+ assertTrue(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+ // PAUSING when in safe mode.
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, false));
+ serviceManager.notifyStatusChanged();
+ assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+
+ // PAUSING when stepping down.
+ scmContext.updateLeaderAndTerm(false, 3);
+ serviceManager.notifyStatusChanged();
+ assertFalse(serviceRunWhenLeaderAndOutOfSafeMode.shouldRun());
+ }
+}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index 5bcdf4b..eb76e9f 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -38,6 +38,7 @@ import
org.apache.hadoop.hdds.scm.container.placement.algorithms.SCMContainerPla
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
@@ -123,7 +124,8 @@ public class TestContainerPlacement {
scmNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- SCMContext.emptyContext());
+ SCMContext.emptyContext(),
+ new SCMServiceManager());
return new SCMContainerManager(config,
scmMetadataStore.getContainerTable(),
scmMetadataStore.getStore(),
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 78e7718..23b7950 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -162,7 +162,6 @@ public class TestDeadNodeHandler {
LambdaTestUtils.await(120000, 1000,
() -> {
- pipelineManager.triggerPipelineCreation();
System.out.println(pipelineManager.getPipelines(RATIS,
THREE).size());
System.out.println(pipelineManager.getPipelines(RATIS, ONE).size());
return pipelineManager.getPipelines(RATIS, THREE).size() > 3;
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
index 947cd37..1759245 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/MockPipelineManager.java
@@ -22,8 +22,6 @@ import org.apache.hadoop.hdds.protocol.MockDatanodeDetails;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
import java.io.IOException;
import java.util.Collection;
@@ -228,10 +226,4 @@ public final class MockPipelineManager implements
PipelineManager {
public Map<String, Integer> getPipelineInfo() {
return null;
}
-
- @Override
- public void onMessage(final SCMSafeModeManager.SafeModeStatus safeModeStatus,
- final EventPublisher publisher) {
-
- }
}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index 654fd9c..862e19b 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -31,6 +31,7 @@ import org.apache.hadoop.hdds.scm.ha.DBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.MockDBTransactionBuffer;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMDBDefinition;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
@@ -71,6 +72,8 @@ public class TestPipelineManagerImpl {
private DBStore dbStore;
private static MockNodeManager nodeManager;
private static int maxPipelineCount;
+ private SCMContext scmContext;
+ private SCMServiceManager serviceManager;
@Before
public void init() throws Exception {
@@ -86,6 +89,8 @@ public class TestPipelineManagerImpl {
conf.getInt(OZONE_DATANODE_PIPELINE_LIMIT,
OZONE_DATANODE_PIPELINE_LIMIT_DEFAULT) /
HddsProtos.ReplicationFactor.THREE.getNumber();
+ scmContext = SCMContext.emptyContext();
+ serviceManager = new SCMServiceManager();
}
@After
@@ -103,7 +108,8 @@ public class TestPipelineManagerImpl {
new MockNodeManager(true, 20),
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
- SCMContext.emptyContext());
+ scmContext,
+ serviceManager);
}
private PipelineManagerV2Impl createPipelineManager(
@@ -113,7 +119,8 @@ public class TestPipelineManagerImpl {
new MockNodeManager(true, 20),
SCMDBDefinition.PIPELINES.getTable(dbStore),
new EventQueue(),
- SCMContext.emptyContext());
+ SCMContext.emptyContext(),
+ serviceManager);
}
@Test
@@ -122,7 +129,6 @@ public class TestPipelineManagerImpl {
PipelineManagerV2Impl pipelineManager =
createPipelineManager(true, buffer1);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
- pipelineManager.allowPipelineCreation();
Pipeline pipeline1 = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -141,7 +147,6 @@ public class TestPipelineManagerImpl {
// Should be able to load previous pipelines.
Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
Assert.assertEquals(2, pipelineManager.getPipelines().size());
- pipelineManager2.allowPipelineCreation();
Pipeline pipeline3 = pipelineManager2.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
buffer2.close();
@@ -155,7 +160,6 @@ public class TestPipelineManagerImpl {
public void testCreatePipelineShouldFailOnFollower() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
- pipelineManager.allowPipelineCreation();
try {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -174,7 +178,6 @@ public class TestPipelineManagerImpl {
createPipelineManager(true, buffer);
Table<PipelineID, Pipeline> pipelineStore =
SCMDBDefinition.PIPELINES.getTable(dbStore);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -218,7 +221,6 @@ public class TestPipelineManagerImpl {
@Test
public void testOpenPipelineShouldFailOnFollower() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -240,7 +242,6 @@ public class TestPipelineManagerImpl {
@Test
public void testActivatePipelineShouldFailOnFollower() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -262,7 +263,6 @@ public class TestPipelineManagerImpl {
@Test
public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -284,7 +284,6 @@ public class TestPipelineManagerImpl {
@Test
public void testRemovePipeline() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
// Create a pipeline
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
@@ -327,7 +326,6 @@ public class TestPipelineManagerImpl {
@Test
public void testClosePipelineShouldFailOnFollower() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS, HddsProtos.ReplicationFactor.THREE);
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -349,10 +347,9 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineReport() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
- new EventQueue());
+ new EventQueue(), serviceManager, scmContext);
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -400,7 +397,6 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineCreationFailedMetric() throws Exception {
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
// No pipeline at start
MetricsRecordBuilder metrics = getMetrics(
@@ -457,10 +453,7 @@ public class TestPipelineManagerImpl {
DBTransactionBuffer buffer1 = new MockDBTransactionBuffer(dbStore);
PipelineManagerV2Impl pipelineManager =
createPipelineManager(true, buffer1);
- pipelineManager.allowPipelineCreation();
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -473,8 +466,8 @@ public class TestPipelineManagerImpl {
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
SCMSafeModeManager scmSafeModeManager =
- new SCMSafeModeManager(new OzoneConfiguration(),
- new ArrayList<>(), pipelineManager, new EventQueue());
+ new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(),
+ pipelineManager, new EventQueue(), serviceManager, scmContext);
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager,
SCMContext.emptyContext(), conf);
@@ -508,7 +501,6 @@ public class TestPipelineManagerImpl {
TimeUnit.MILLISECONDS);
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -541,7 +533,6 @@ public class TestPipelineManagerImpl {
TimeUnit.MILLISECONDS);
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager
.createPipeline(HddsProtos.ReplicationType.RATIS,
HddsProtos.ReplicationFactor.THREE);
@@ -577,6 +568,9 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, false));
+
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
try {
pipelineManager.createPipeline(HddsProtos.ReplicationType.RATIS,
@@ -597,8 +591,8 @@ public class TestPipelineManagerImpl {
HddsProtos.ReplicationFactor.ONE).contains(pipeline));
// Simulate safemode check exiting.
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, true));
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
@@ -616,17 +610,22 @@ public class TestPipelineManagerImpl {
TimeUnit.MILLISECONDS);
PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, false));
Assert.assertTrue(pipelineManager.getSafeModeStatus());
Assert.assertFalse(pipelineManager.isPipelineCreationAllowed());
+
// First pass pre-check as true, but safemode still on
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
+ // Simulate safemode check exiting.
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(true, true));
Assert.assertTrue(pipelineManager.getSafeModeStatus());
Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
// Then also turn safemode off
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(false, true), null);
+ scmContext.updateSafeModeStatus(
+ new SCMSafeModeManager.SafeModeStatus(false, true));
Assert.assertFalse(pipelineManager.getSafeModeStatus());
Assert.assertTrue(pipelineManager.isPipelineCreationAllowed());
pipelineManager.close();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
index 2226a43..24cb4b5 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
@@ -43,6 +43,7 @@ import org.apache.hadoop.hdds.scm.container.ContainerID;
import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
@@ -221,7 +222,8 @@ public class TestSCMPipelineManager {
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), pipelineManager,
- eventQueue);
+ eventQueue, new SCMServiceManager(),
+ SCMContext.emptyContext());
// create a pipeline in allocated state with no dns yet reported
Pipeline pipeline = pipelineManager
@@ -494,8 +496,10 @@ public class TestSCMPipelineManager {
pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
SCMSafeModeManager scmSafeModeManager =
- new SCMSafeModeManager(new OzoneConfiguration(),
- new ArrayList<>(), pipelineManager, eventQueue);
+ new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(),
+ pipelineManager, eventQueue,
+ new SCMServiceManager(),
+ SCMContext.emptyContext());
PipelineReportHandler pipelineReportHandler =
new PipelineReportHandler(scmSafeModeManager, pipelineManager,
SCMContext.emptyContext(), conf);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 19f1f30..e7dbeeb 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -33,6 +33,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -55,6 +56,8 @@ public class TestHealthyPipelineSafeModeRule {
public void testHealthyPipelineSafeModeRuleWithNoPipelines()
throws Exception {
EventQueue eventQueue = new EventQueue();
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ SCMContext scmContext = SCMContext.emptyContext();
List<ContainerInfo> containers =
new ArrayList<>(HddsTestUtils.getContainerInfo(1));
@@ -79,14 +82,16 @@ public class TestHealthyPipelineSafeModeRule {
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- SCMContext.emptyContext());
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
- config, containers, pipelineManager, eventQueue);
+ config, containers, pipelineManager, eventQueue,
+ serviceManager, scmContext);
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
scmSafeModeManager.getHealthyPipelineSafeModeRule();
@@ -105,6 +110,8 @@ public class TestHealthyPipelineSafeModeRule {
TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
EventQueue eventQueue = new EventQueue();
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ SCMContext scmContext = SCMContext.emptyContext();
List<ContainerInfo> containers =
new ArrayList<>(HddsTestUtils.getContainerInfo(1));
@@ -129,8 +136,8 @@ public class TestHealthyPipelineSafeModeRule {
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- SCMContext.emptyContext());
- pipelineManager.allowPipelineCreation();
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -163,7 +170,8 @@ public class TestHealthyPipelineSafeModeRule {
MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
- config, containers, pipelineManager, eventQueue);
+ config, containers, pipelineManager, eventQueue,
+ serviceManager, scmContext);
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
scmSafeModeManager.getHealthyPipelineSafeModeRule();
@@ -199,6 +207,8 @@ public class TestHealthyPipelineSafeModeRule {
TestHealthyPipelineSafeModeRule.class.getName() + UUID.randomUUID());
EventQueue eventQueue = new EventQueue();
+ SCMServiceManager serviceManager = new SCMServiceManager();
+ SCMContext scmContext = SCMContext.emptyContext();
List<ContainerInfo> containers =
new ArrayList<>(HddsTestUtils.getContainerInfo(1));
@@ -224,9 +234,9 @@ public class TestHealthyPipelineSafeModeRule {
nodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- SCMContext.emptyContext());
+ scmContext,
+ serviceManager);
- pipelineManager.allowPipelineCreation();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
@@ -258,7 +268,8 @@ public class TestHealthyPipelineSafeModeRule {
MockRatisPipelineProvider.markPipelineHealthy(pipeline3);
SCMSafeModeManager scmSafeModeManager = new SCMSafeModeManager(
- config, containers, pipelineManager, eventQueue);
+ config, containers, pipelineManager, eventQueue,
+ serviceManager, scmContext);
HealthyPipelineSafeModeRule healthyPipelineSafeModeRule =
scmSafeModeManager.getHealthyPipelineSafeModeRule();
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index b915899..6de81dc 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -34,6 +34,7 @@ import org.apache.hadoop.hdds.scm.container.MockNodeManager;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -62,6 +63,8 @@ public class TestOneReplicaPipelineSafeModeRule {
private OneReplicaPipelineSafeModeRule rule;
private PipelineManagerV2Impl pipelineManager;
private EventQueue eventQueue;
+ private SCMServiceManager serviceManager;
+ private SCMContext scmContext;
private MockNodeManager mockNodeManager;
private void setup(int nodes, int pipelineFactorThreeCount,
@@ -79,6 +82,8 @@ public class TestOneReplicaPipelineSafeModeRule {
mockNodeManager = new MockNodeManager(true, nodes);
eventQueue = new EventQueue();
+ serviceManager = new SCMServiceManager();
+ scmContext = SCMContext.emptyContext();
SCMMetadataStore scmMetadataStore =
new SCMMetadataStoreImpl(ozoneConfiguration);
@@ -89,8 +94,8 @@ public class TestOneReplicaPipelineSafeModeRule {
mockNodeManager,
scmMetadataStore.getPipelineTable(),
eventQueue,
- SCMContext.emptyContext());
- pipelineManager.allowPipelineCreation();
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
@@ -105,7 +110,7 @@ public class TestOneReplicaPipelineSafeModeRule {
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(ozoneConfiguration, containers,
- pipelineManager, eventQueue);
+ pipelineManager, eventQueue, serviceManager, scmContext);
rule = scmSafeModeManager.getOneReplicaPipelineSafeModeRule();
}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index e8dbc2e..523f964 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -23,7 +23,6 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.UUID;
-import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
@@ -40,6 +39,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
import org.apache.hadoop.hdds.scm.ha.MockSCMHAManager;
import org.apache.hadoop.hdds.scm.ha.SCMContext;
+import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
@@ -47,7 +47,6 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -73,6 +72,7 @@ public class TestSCMSafeModeManager {
private static EventQueue queue;
private SCMContext scmContext;
+ private SCMServiceManager serviceManager;
private SCMSafeModeManager scmSafeModeManager;
private static OzoneConfiguration config;
private List<ContainerInfo> containers = Collections.emptyList();
@@ -88,7 +88,8 @@ public class TestSCMSafeModeManager {
@Before
public void setUp() {
queue = new EventQueue();
- scmContext = SCMContext.emptyContext();
+ scmContext = new SCMContext.Builder().build();
+ serviceManager = new SCMServiceManager();
config = new OzoneConfiguration();
config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
false);
@@ -120,7 +121,7 @@ public class TestSCMSafeModeManager {
@Test
public void testSafeModeStateWithNullContainers() {
new SCMSafeModeManager(config, Collections.emptyList(),
- null, queue);
+ null, queue, serviceManager, scmContext);
}
private void testSafeMode(int numContainers) throws Exception {
@@ -132,7 +133,7 @@ public class TestSCMSafeModeManager {
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
scmSafeModeManager = new SCMSafeModeManager(
- config, containers, null, queue);
+ config, containers, null, queue, serviceManager, scmContext);
assertTrue(scmSafeModeManager.getInSafeMode());
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
@@ -155,59 +156,6 @@ public class TestSCMSafeModeManager {
}
@Test
- public void testDelayedEventNotification() throws Exception {
-
- List<SafeModeStatus> delayedSafeModeEvents = new ArrayList<>();
- List<SafeModeStatus> safeModeEvents = new ArrayList<>();
-
- //given
- EventQueue eventQueue = new EventQueue();
- eventQueue.addHandler(SCMEvents.SAFE_MODE_STATUS,
- (safeModeStatus, publisher) -> safeModeEvents.add(safeModeStatus));
- eventQueue.addHandler(SCMEvents.DELAYED_SAFE_MODE_STATUS,
- (safeModeStatus, publisher) -> delayedSafeModeEvents
- .add(safeModeStatus));
-
- OzoneConfiguration ozoneConfiguration = new OzoneConfiguration();
- ozoneConfiguration
-
.setTimeDuration(HddsConfigKeys.HDDS_SCM_WAIT_TIME_AFTER_SAFE_MODE_EXIT,
- 3, TimeUnit.SECONDS);
- ozoneConfiguration
- .setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
-
- scmSafeModeManager = new SCMSafeModeManager(
- ozoneConfiguration, containers, null, eventQueue);
-
- //when
- scmSafeModeManager.setInSafeMode(true);
- scmSafeModeManager.setPreCheckComplete(true);
-
- scmSafeModeManager.emitSafeModeStatus();
- eventQueue.processAll(1000L);
-
- //then
- Assert.assertEquals(1, delayedSafeModeEvents.size());
- Assert.assertEquals(1, safeModeEvents.size());
-
- //when
- scmSafeModeManager.setInSafeMode(false);
- scmSafeModeManager.setPreCheckComplete(true);
-
- scmSafeModeManager.emitSafeModeStatus();
- eventQueue.processAll(1000L);
-
- //then
- Assert.assertEquals(2, safeModeEvents.size());
- //delayed messages are not yet sent (unless JVM is paused for 3 seconds)
- Assert.assertEquals(1, delayedSafeModeEvents.size());
-
- //event will be triggered after 3 seconds (see previous config)
- GenericTestUtils.waitFor(() -> delayedSafeModeEvents.size() == 2,
- 300,
- 6000);
-
- }
- @Test
public void testSafeModeExitRule() throws Exception {
containers = new ArrayList<>();
int numContainers = 100;
@@ -218,7 +166,7 @@ public class TestSCMSafeModeManager {
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
scmSafeModeManager = new SCMSafeModeManager(
- config, containers, null, queue);
+ config, containers, null, queue, serviceManager, scmContext);
long cutOff = (long) Math.ceil(numContainers * config.getDouble(
HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT,
@@ -311,9 +259,10 @@ public class TestSCMSafeModeManager {
mockNodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
scmSafeModeManager = new SCMSafeModeManager(
- conf, containers, pipelineManager, queue);
+ conf, containers, pipelineManager, queue, serviceManager,
scmContext);
fail("testFailWithIncorrectValueForHealthyPipelinePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <="
+
@@ -335,9 +284,10 @@ public class TestSCMSafeModeManager {
mockNodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
scmSafeModeManager = new SCMSafeModeManager(
- conf, containers, pipelineManager, queue);
+ conf, containers, pipelineManager, queue, serviceManager,
scmContext);
fail("testFailWithIncorrectValueForOneReplicaPipelinePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <="
+
@@ -358,9 +308,10 @@ public class TestSCMSafeModeManager {
mockNodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
scmSafeModeManager = new SCMSafeModeManager(
- conf, containers, pipelineManager, queue);
+ conf, containers, pipelineManager, queue, serviceManager,
scmContext);
fail("testFailWithIncorrectValueForSafeModePercent");
} catch (IllegalArgumentException ex) {
GenericTestUtils.assertExceptionContains("value should be >= 0.0 and <="
+
@@ -388,13 +339,14 @@ public class TestSCMSafeModeManager {
mockNodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(mockNodeManager,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
- pipelineManager.allowPipelineCreation();
+ pipelineManager.getBackgroundPipelineCreator().stop();
for (int i = 0; i < pipelineCount; i++) {
// Create pipeline
@@ -412,8 +364,8 @@ public class TestSCMSafeModeManager {
container.setState(HddsProtos.LifeCycleState.CLOSED);
}
- scmSafeModeManager = new SCMSafeModeManager(conf, containers,
- pipelineManager, queue);
+ scmSafeModeManager = new SCMSafeModeManager(
+ conf, containers, pipelineManager, queue, serviceManager, scmContext);
assertTrue(scmSafeModeManager.getInSafeMode());
testContainerThreshold(containers, 1.0);
@@ -523,9 +475,8 @@ public class TestSCMSafeModeManager {
OzoneConfiguration conf = new OzoneConfiguration(config);
conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, false);
PipelineManager pipelineManager = Mockito.mock(PipelineManager.class);
- Mockito.doNothing().when(pipelineManager).startPipelineCreator();
- scmSafeModeManager =
- new SCMSafeModeManager(conf, containers, pipelineManager, queue);
+ scmSafeModeManager = new SCMSafeModeManager(
+ conf, containers, pipelineManager, queue, serviceManager, scmContext);
assertFalse(scmSafeModeManager.getInSafeMode());
}
@@ -557,7 +508,7 @@ public class TestSCMSafeModeManager {
}
scmSafeModeManager = new SCMSafeModeManager(
- config, containers, null, queue);
+ config, containers, null, queue, serviceManager, scmContext);
assertTrue(scmSafeModeManager.getInSafeMode());
@@ -581,7 +532,7 @@ public class TestSCMSafeModeManager {
OzoneConfiguration conf = new OzoneConfiguration(config);
conf.setInt(HddsConfigKeys.HDDS_SCM_SAFEMODE_MIN_DATANODE, numOfDns);
scmSafeModeManager = new SCMSafeModeManager(
- conf, containers, null, queue);
+ conf, containers, null, queue, serviceManager, scmContext);
// Assert SCM is in Safe mode.
assertTrue(scmSafeModeManager.getInSafeMode());
@@ -639,14 +590,14 @@ public class TestSCMSafeModeManager {
nodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), config);
pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
mockRatisProvider);
- pipelineManager.allowPipelineCreation();
Pipeline pipeline = pipelineManager.createPipeline(
HddsProtos.ReplicationType.RATIS,
@@ -656,7 +607,8 @@ public class TestSCMSafeModeManager {
MockRatisPipelineProvider.markPipelineHealthy(pipeline);
scmSafeModeManager = new SCMSafeModeManager(
- config, containers, pipelineManager, queue);
+ config, containers, pipelineManager, queue, serviceManager,
+ scmContext);
queue.fireEvent(SCMEvents.NODE_REGISTRATION_CONT_REPORT,
HddsTestUtils.createNodeRegistrationContainerReport(containers));
@@ -703,7 +655,8 @@ public class TestSCMSafeModeManager {
nodeManager,
scmMetadataStore.getPipelineTable(),
queue,
- scmContext);
+ scmContext,
+ serviceManager);
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
@@ -714,7 +667,7 @@ public class TestSCMSafeModeManager {
SafeModeEventHandler smHandler = new SafeModeEventHandler();
queue.addHandler(SCMEvents.SAFE_MODE_STATUS, smHandler);
scmSafeModeManager = new SCMSafeModeManager(
- config, containers, pipelineManager, queue);
+ config, containers, pipelineManager, queue, serviceManager,
scmContext);
// Assert SCM is in Safe mode.
assertTrue(scmSafeModeManager.getInSafeMode());
@@ -739,9 +692,6 @@ public class TestSCMSafeModeManager {
Assert.assertEquals(true, smHandler.getPreCheckComplete());
Assert.assertEquals(true, smHandler.getIsInSafeMode());
- // Create a pipeline and ensure safemode is exited.
- pipelineManager.allowPipelineCreation();
-
/* There is a race condition where the background pipeline creation
* task creates the pipeline before the following create call.
* So wrapping it with try..catch.
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
index 26852f0..657f5db 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestRatisPipelineCreateAndDestroy.java
@@ -21,6 +21,7 @@ import org.apache.hadoop.hdds.HddsConfigKeys;
import org.apache.hadoop.hdds.conf.OzoneConfiguration;
import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMService.Event;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.ozone.HddsDatanodeService;
@@ -160,7 +161,9 @@ public class TestRatisPipelineCreateAndDestroy {
.getScmNodeManager().getNodeCount(NodeStatus.inServiceHealthy()) >=
HddsProtos.ReplicationFactor.THREE.getNumber()) {
// make sure pipelines is created after node start
- pipelineManager.triggerPipelineCreation();
+ cluster.getStorageContainerManager()
+ .getSCMServiceManager()
+ .notifyEventTriggered(Event.PRE_CHECK_COMPLETED);
waitForPipelines(1);
}
}
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
index 2a468f5..635fe30 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestStorageContainerManager.java
@@ -73,6 +73,7 @@ import
org.apache.hadoop.hdds.scm.container.ReplicationManager;
import
org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline;
import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.exceptions.SCMException;
+import org.apache.hadoop.hdds.scm.ha.SCMContext;
import org.apache.hadoop.hdds.scm.node.DatanodeInfo;
import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.node.NodeStatus;
@@ -647,10 +648,16 @@ public class TestStorageContainerManager {
dnUuid, closeContainerCommand);
GenericTestUtils.waitFor(() -> {
- return replicationManager.isRunning();
+ SCMContext scmContext
+ = cluster.getStorageContainerManager().getScmContext();
+ return !scmContext.isInSafeMode() && scmContext.isLeader();
}, 1000, 25000);
+ // After safe mode is off, ReplicationManager starts to run with a delay.
+ Thread.sleep(5000);
// Give ReplicationManager some time to process the containers.
+ cluster.getStorageContainerManager()
+ .getReplicationManager().processContainersNow();
Thread.sleep(5000);
verify(publisher).fireEvent(eq(SCMEvents.DATANODE_COMMAND), argThat(new
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
index 5e08f2d..69f07df 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/TestContainerReplication.java
@@ -141,7 +141,7 @@ public class TestContainerReplication {
SCMCommand<?> command = new ReplicateContainerCommand(containerId,
sourcePipelines.getNodes());
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(destinationDatanode.getDatanodeDetails().getUuid(),
command);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
index 00a338b..b45603a 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerByPipeline.java
@@ -147,7 +147,7 @@ public class TestCloseContainerByPipeline {
SCMCommand<?> command = new CloseContainerCommand(
containerID, pipeline.getId());
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils
@@ -198,7 +198,7 @@ public class TestCloseContainerByPipeline {
SCMCommand<?> command = new CloseContainerCommand(
containerID, pipeline.getId());
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(), command);
@@ -251,8 +251,8 @@ public class TestCloseContainerByPipeline {
//send the order to close the container
SCMCommand<?> command = new CloseContainerCommand(
containerID, pipeline.getId());
- command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+ command.setTerm(cluster.getStorageContainerManager()
+ .getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(details.getUuid(), command);
int index = cluster.getHddsDatanodeIndex(details);
@@ -332,7 +332,7 @@ public class TestCloseContainerByPipeline {
SCMCommand<?> command = new CloseContainerCommand(
containerID, pipeline.getId(), true);
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
index 870486a..c165f92 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestCloseContainerHandler.java
@@ -123,7 +123,7 @@ public class TestCloseContainerHandler {
SCMCommand<?> command = new CloseContainerCommand(
containerId.getId(), pipeline.getId());
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
cluster.getStorageContainerManager().getScmNodeManager()
.addDatanodeCommand(datanodeDetails.getUuid(), command);
diff --git
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
index 40998e1..ab20505 100644
---
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
+++
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/container/common/statemachine/commandhandler/TestDeleteContainerHandler.java
@@ -136,7 +136,7 @@ public class TestDeleteContainerHandler {
SCMCommand<?> command = new CloseContainerCommand(
containerId.getId(), pipeline.getId());
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
@@ -154,7 +154,7 @@ public class TestDeleteContainerHandler {
// send delete container to the datanode
command = new DeleteContainerCommand(containerId.getId(), false);
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
@@ -192,7 +192,7 @@ public class TestDeleteContainerHandler {
SCMCommand<?> command = new DeleteContainerCommand(
containerId.getId(), false);
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
// Here it should not delete it, and the container should exist in the
@@ -216,7 +216,7 @@ public class TestDeleteContainerHandler {
// container
command = new DeleteContainerCommand(containerId.getId(), true);
command.setTerm(
- cluster.getStorageContainerManager().getScmContext().getTerm());
+
cluster.getStorageContainerManager().getScmContext().getTermOfLeader());
nodeManager.addDatanodeCommand(datanodeDetails.getUuid(), command);
GenericTestUtils.waitFor(() ->
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]