This is an automated email from the ASF dual-hosted git repository.
sammichen pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/ozone.git
The following commit(s) were added to refs/heads/master by this push:
new f922b0d HDDS-5555. remove pipeline manager v1 code (#2511)
f922b0d is described below
commit f922b0df3fe1f9afb7972cf0baef18ffe7b941f3
Author: Jackson Yao <[email protected]>
AuthorDate: Tue Aug 10 15:01:41 2021 +0800
HDDS-5555. remove pipeline manager v1 code (#2511)
---
...ManagerV2Impl.java => PipelineManagerImpl.java} | 22 +-
.../hdds/scm/pipeline/SCMPipelineManager.java | 801 --------------------
.../hdds/scm/server/StorageContainerManager.java | 4 +-
.../hadoop/hdds/scm/block/TestBlockManager.java | 6 +-
.../container/TestCloseContainerEventHandler.java | 6 +-
.../scm/container/TestSCMContainerManager.java | 6 +-
.../hdds/scm/node/TestContainerPlacement.java | 4 +-
.../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 6 +-
.../hdds/scm/pipeline/TestPipelineManagerImpl.java | 40 +-
.../hdds/scm/pipeline/TestSCMPipelineManager.java | 821 ---------------------
.../safemode/TestHealthyPipelineSafeModeRule.java | 14 +-
.../TestOneReplicaPipelineSafeModeRule.java | 6 +-
.../hdds/scm/safemode/TestSCMSafeModeManager.java | 20 +-
.../ozone/recon/scm/ReconPipelineManager.java | 4 +-
14 files changed, 69 insertions(+), 1691 deletions(-)
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
similarity index 96%
rename from
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
rename to
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index a0f7e0f..18bbcbc 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerV2Impl.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@ -64,9 +64,9 @@ import java.util.concurrent.locks.ReentrantLock;
* All the write operations for pipelines must come via PipelineManager.
* It synchronises all write and read operations via a ReadWriteLock.
*/
-public class PipelineManagerV2Impl implements PipelineManager {
+public class PipelineManagerImpl implements PipelineManager {
private static final Logger LOG =
- LoggerFactory.getLogger(PipelineManagerV2Impl.class);
+ LoggerFactory.getLogger(PipelineManagerImpl.class);
// Limit the number of on-going ratis operation to be 1.
private final Lock lock;
@@ -86,13 +86,13 @@ public class PipelineManagerV2Impl implements
PipelineManager {
// SCM is already out of SafeMode.
private AtomicBoolean freezePipelineCreation;
- protected PipelineManagerV2Impl(ConfigurationSource conf,
- SCMHAManager scmhaManager,
- NodeManager nodeManager,
- StateManager pipelineStateManager,
- PipelineFactory pipelineFactory,
- EventPublisher eventPublisher,
- SCMContext scmContext) {
+ protected PipelineManagerImpl(ConfigurationSource conf,
+ SCMHAManager scmhaManager,
+ NodeManager nodeManager,
+ StateManager pipelineStateManager,
+ PipelineFactory pipelineFactory,
+ EventPublisher eventPublisher,
+ SCMContext scmContext) {
this.lock = new ReentrantLock();
this.pipelineFactory = pipelineFactory;
this.stateManager = pipelineStateManager;
@@ -111,7 +111,7 @@ public class PipelineManagerV2Impl implements
PipelineManager {
this.freezePipelineCreation = new AtomicBoolean();
}
- public static PipelineManagerV2Impl newPipelineManager(
+ public static PipelineManagerImpl newPipelineManager(
ConfigurationSource conf,
SCMHAManager scmhaManager,
NodeManager nodeManager,
@@ -132,7 +132,7 @@ public class PipelineManagerV2Impl implements
PipelineManager {
nodeManager, stateManager, conf, eventPublisher, scmContext);
// Create PipelineManager
- PipelineManagerV2Impl pipelineManager = new PipelineManagerV2Impl(conf,
+ PipelineManagerImpl pipelineManager = new PipelineManagerImpl(conf,
scmhaManager, nodeManager, stateManager, pipelineFactory,
eventPublisher, scmContext);
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
deleted file mode 100644
index 31b8115..0000000
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ /dev/null
@@ -1,801 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
-
-/**
- * Implements api needed for management of pipelines. All the write operations
- * for pipelines must come via PipelineManager. It synchronises all write
- * and read operations via a ReadWriteLock.
- */
-public class SCMPipelineManager implements
- PipelineManager, EventHandler<SafeModeStatus> {
-
- private static final Logger LOG =
- LoggerFactory.getLogger(SCMPipelineManager.class);
-
- private final ReadWriteLock lock;
- private PipelineFactory pipelineFactory;
- private StateManager stateManager;
- private final BackgroundPipelineCreator backgroundPipelineCreator;
- private Scheduler scheduler;
-
- private final EventPublisher eventPublisher;
- private final NodeManager nodeManager;
- private final SCMPipelineMetrics metrics;
- private final ConfigurationSource conf;
- private long pipelineWaitDefaultTimeout;
- // Pipeline Manager MXBean
- private ObjectName pmInfoBean;
-
- private Table<PipelineID, Pipeline> pipelineStore;
-
- private final AtomicBoolean isInSafeMode;
- // Used to track if the safemode pre-checks have completed. This is designed
- // to prevent pipelines being created until sufficient nodes have registered.
- private final AtomicBoolean pipelineCreationAllowed;
-
- // This allows for freezing/resuming the new pipeline creation while the
- // SCM is already out of SafeMode.
- private AtomicBoolean freezePipelineCreation;
-
- public SCMPipelineManager(ConfigurationSource conf,
- NodeManager nodeManager,
- Table<PipelineID, Pipeline> pipelineStore,
- EventPublisher eventPublisher)
- throws IOException {
- this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
- this.stateManager = new PipelineStateManager();
- this.pipelineFactory = new PipelineFactory(nodeManager,
- stateManager, conf, eventPublisher, SCMContext.emptyContext());
- this.pipelineStore = pipelineStore;
- initializePipelineState();
- }
-
- protected SCMPipelineManager(ConfigurationSource conf,
- NodeManager nodeManager,
- Table<PipelineID, Pipeline> pipelineStore,
- EventPublisher eventPublisher,
- PipelineStateManager pipelineStateManager,
- PipelineFactory pipelineFactory)
- throws IOException {
- this.lock = new ReentrantReadWriteLock();
- this.pipelineStore = pipelineStore;
- this.conf = conf;
- this.pipelineFactory = pipelineFactory;
- this.stateManager = pipelineStateManager;
- // TODO: See if thread priority needs to be set for these threads
- scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
- this.backgroundPipelineCreator =
- new BackgroundPipelineCreator(this, scheduler, conf);
- this.eventPublisher = eventPublisher;
- this.nodeManager = nodeManager;
- this.metrics = SCMPipelineMetrics.create();
- this.pmInfoBean = MBeans.register("SCMPipelineManager",
- "SCMPipelineManagerInfo", this);
- this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
- HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
- HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
- TimeUnit.MILLISECONDS);
- this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
- HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
- HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
- // Pipeline creation is only allowed after the safemode prechecks have
- // passed, eg sufficient nodes have registered.
- this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
- // controls freezing/resuming pipeline creation regardless of SafeMode
- // status.
- this.freezePipelineCreation = new AtomicBoolean(false);
- }
-
- public StateManager getStateManager() {
- return stateManager;
- }
-
- @VisibleForTesting
- public void setPipelineProvider(ReplicationType replicationType,
- PipelineProvider provider) {
- pipelineFactory.setProvider(replicationType, provider);
- }
-
- @VisibleForTesting
- public void allowPipelineCreation() {
- this.pipelineCreationAllowed.set(true);
- }
-
- @VisibleForTesting
- public boolean isPipelineCreationAllowed() {
- return pipelineCreationAllowed.get();
- }
-
- protected void initializePipelineState() throws IOException {
- if (pipelineStore.isEmpty()) {
- LOG.info("No pipeline exists in current db");
- return;
- }
- TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
- iterator = pipelineStore.iterator();
- while (iterator.hasNext()) {
- Pipeline pipeline = nextPipelineFromIterator(iterator);
- stateManager.addPipeline(pipeline);
- nodeManager.addPipeline(pipeline);
- }
- }
-
- private Pipeline nextPipelineFromIterator(
- TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it
- ) throws IOException {
- KeyValue<PipelineID, Pipeline> actual = it.next();
- Pipeline pipeline = actual.getValue();
- PipelineID pipelineID = actual.getKey();
- checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
- return pipeline;
- }
-
- /**
- * This method is part of the change that happens in HDDS-3925, and we can
- * and should remove this on later on.
- * The purpose of the change is to get rid of protobuf serialization in the
- * SCM database Pipeline table keys. The keys are not used anywhere, and the
- * PipelineID that is used as a key is in the value as well, so we can detect
- * a change in the key translation to byte[] and if we have the old format
- * we refresh the table contents during SCM startup.
- *
- * If this fails in the remove, then there is an IOException coming from
- * RocksDB itself, in this case in memory structures will still be fine and
- * SCM should be operational, however we will attempt to replace the old key
- * at next startup. In this case removing of the pipeline will leave the
- * pipeline in RocksDB, and during next startup we will attempt to delete it
- * again. This does not affect any runtime operations.
- * If a Pipeline should have been deleted but remained in RocksDB, then at
- * next startup it will be replaced and added with the new key, then SCM will
- * detect that it is an invalid Pipeline and successfully delete it with the
- * new key.
- * For further info check the JIRA.
- *
- * @param it the iterator used to iterate the Pipeline table
- * @param pipeline the pipeline read already from the iterator
- * @param pipelineID the pipeline ID read from the raw data via the iterator
- */
- private void checkKeyAndReplaceIfObsolete(
- TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it,
- Pipeline pipeline,
- PipelineID pipelineID
- ) {
- if (!pipelineID.equals(pipeline.getId())) {
- try {
- LOG.info("Found pipeline in old format key : {}", pipeline.getId());
- it.removeFromDB();
- pipelineStore.put(pipeline.getId(), pipeline);
- } catch (IOException e) {
- LOG.info("Pipeline table in RocksDB has an old key format, and "
- + "removing the pipeline with the old key was unsuccessful."
- + "Pipeline: {}", pipeline);
- }
- }
- }
-
- private void recordMetricsForPipeline(Pipeline pipeline) {
- metrics.incNumPipelineAllocated();
- if (pipeline.isOpen()) {
- metrics.incNumPipelineCreated();
- metrics.createPerPipelineMetrics(pipeline);
- }
- switch (pipeline.getType()) {
- case STAND_ALONE:
- return;
- case RATIS:
- List<Pipeline> overlapPipelines = RatisPipelineUtils
- .checkPipelineContainSameDatanodes(stateManager, pipeline);
- if (!overlapPipelines.isEmpty()) {
- // Count 1 overlap at a time.
- metrics.incNumPipelineContainSameDatanodes();
- //TODO remove until pipeline allocation is proved equally distributed.
- for (Pipeline overlapPipeline : overlapPipelines) {
- LOG.info("Pipeline: " + pipeline.getId().toString() +
- " contains same datanodes as previous pipelines: " +
- overlapPipeline.getId().toString() + " nodeIds: " +
- pipeline.getNodes().get(0).getUuid().toString() +
- ", " + pipeline.getNodes().get(1).getUuid().toString() +
- ", " + pipeline.getNodes().get(2).getUuid().toString());
- }
- }
- return;
- case CHAINED:
- // Not supported.
- default:
- // Not supported.
- return;
- }
- }
-
- @Override
- public Pipeline createPipeline(ReplicationConfig replicationConfig)
- throws IOException {
- if (!isPipelineCreationAllowed()
- && replicationConfig.getRequiredNodes() != 1) {
- LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
- "complete");
- throw new IOException("Pipeline creation is not allowed as safe mode " +
- "prechecks have not yet passed");
- }
- if (freezePipelineCreation.get()) {
- LOG.debug("Pipeline creation is frozen while an upgrade is in " +
- "progress");
- throw new IOException("Pipeline creation is frozen while an upgrade " +
- "is in progress");
- }
- lock.writeLock().lock();
- try {
- Pipeline pipeline = pipelineFactory.create(replicationConfig);
- if (pipelineStore != null) {
- pipelineStore.put(pipeline.getId(), pipeline);
- }
- stateManager.addPipeline(pipeline);
- nodeManager.addPipeline(pipeline);
- recordMetricsForPipeline(pipeline);
- return pipeline;
- } catch (IOException ex) {
- if (ex instanceof SCMException &&
- ((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) {
- // Avoid spam SCM log with errors when SCM has enough open pipelines
- LOG.debug("Can't create more pipelines of replicationConfig: {}. " +
- "Reason: {}", replicationConfig, ex.getMessage());
- } else {
- LOG.error("Failed to create pipeline of replicationConfig: {}. " +
- "Exception: {}", replicationConfig, ex.getMessage());
- }
- metrics.incNumPipelineCreationFailed();
- throw ex;
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public Pipeline createPipeline(ReplicationConfig replicationConfig,
- List<DatanodeDetails> nodes) {
- // This will mostly be used to create dummy pipeline for SimplePipelines.
- // We don't update the metrics for SimplePipelines.
- lock.writeLock().lock();
- try {
- return pipelineFactory.create(replicationConfig, nodes);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public Pipeline getPipeline(PipelineID pipelineID)
- throws PipelineNotFoundException {
- lock.readLock().lock();
- try {
- return stateManager.getPipeline(pipelineID);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public boolean containsPipeline(PipelineID pipelineID) {
- lock.readLock().lock();
- try {
- getPipeline(pipelineID);
- return true;
- } catch (PipelineNotFoundException e) {
- return false;
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public List<Pipeline> getPipelines() {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines();
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public List<Pipeline> getPipelines(ReplicationConfig replicationConfig) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(replicationConfig);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
- Pipeline.PipelineState state) {
- lock.readLock().lock();
- try {
- return stateManager.getPipelines(replicationConfig, state);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
- Pipeline.PipelineState state,
- Collection<DatanodeDetails> excludeDns,
- Collection<PipelineID> excludePipelines) {
- lock.readLock().lock();
- try {
- return stateManager
- .getPipelines(replicationConfig, state, excludeDns,
excludePipelines);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public void addContainerToPipeline(PipelineID pipelineID,
- ContainerID containerID) throws IOException {
- lock.writeLock().lock();
- try {
- stateManager.addContainerToPipeline(pipelineID, containerID);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- private void updatePipelineStateInDb(PipelineID pipelineId,
- Pipeline.PipelineState oldState)
- throws IOException {
- // null check is here to prevent the case where SCM store
- // is closed but the staleNode handlers/pipleine creations
- // still try to access it.
- if (pipelineStore != null) {
- try {
- pipelineStore.put(pipelineId, getPipeline(pipelineId));
- } catch (IOException ex) {
- LOG.warn("Pipeline {} state update failed", pipelineId);
- // revert back to old state in memory
- stateManager.updatePipelineState(pipelineId, oldState);
- }
- }
- }
-
- @Override
- public void removeContainerFromPipeline(PipelineID pipelineID,
- ContainerID containerID) throws IOException {
- lock.writeLock().lock();
- try {
- stateManager.removeContainerFromPipeline(pipelineID, containerID);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public NavigableSet<ContainerID> getContainersInPipeline(
- PipelineID pipelineID) throws IOException {
- lock.readLock().lock();
- try {
- return stateManager.getContainers(pipelineID);
- } finally {
- lock.readLock().unlock();
- }
- }
-
- @Override
- public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
- return stateManager.getNumberOfContainers(pipelineID);
- }
-
- @Override
- public void openPipeline(PipelineID pipelineId) throws IOException {
- lock.writeLock().lock();
- try {
- Pipeline.PipelineState state = stateManager.
- getPipeline(pipelineId).getPipelineState();
- Pipeline pipeline = stateManager.openPipeline(pipelineId);
- updatePipelineStateInDb(pipelineId, state);
- metrics.incNumPipelineCreated();
- metrics.createPerPipelineMetrics(pipeline);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Fire events to close all containers related to the input pipeline.
- * @param pipelineId - ID of the pipeline.
- * @throws IOException
- */
- protected void closeContainersForPipeline(final PipelineID pipelineId)
- throws IOException {
- Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
- for (ContainerID containerID : containerIDs) {
- eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
- }
- }
-
- /**
- * put pipeline in CLOSED state.
- * @param pipeline - ID of the pipeline.
- * @param onTimeout - whether to remove pipeline after some time.
- * @throws IOException
- */
- @Override
- public void closePipeline(Pipeline pipeline, boolean onTimeout)
- throws IOException {
- PipelineID pipelineID = pipeline.getId();
- lock.writeLock().lock();
- try {
- if (!pipeline.isClosed()) {
- stateManager.updatePipelineState(pipelineID,
- Pipeline.PipelineState.CLOSED);
- LOG.info("Pipeline {} moved to CLOSED state", pipeline);
- }
- metrics.removePipelineMetrics(pipelineID);
- } finally {
- lock.writeLock().unlock();
- }
- // close containers.
- closeContainersForPipeline(pipelineID);
- if (!onTimeout) {
- // close pipeline right away.
- removePipeline(pipeline);
- }
- }
-
- /**
- * Scrub pipelines.
- */
- @Override
- public void scrubPipeline(ReplicationConfig replicationConfig)
- throws IOException {
- if (!RatisReplicationConfig.hasFactor(replicationConfig,
- ReplicationFactor.THREE)) {
- // Only srub pipeline for RATIS THREE pipeline
- return;
- }
- Instant currentTime = Instant.now();
- Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
- ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
- ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
- TimeUnit.MILLISECONDS);
-
- List<Pipeline> candidates = stateManager.getPipelines(replicationConfig);
-
- for (Pipeline p : candidates) {
- // scrub pipelines who stay ALLOCATED for too long.
- if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
- (currentTime.toEpochMilli() - p.getCreationTimestamp()
- .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it stays at ALLOCATED stage for " +
- Duration.between(currentTime, p.getCreationTimestamp())
- .toMinutes() + " mins.");
- closePipeline(p, false);
- }
- // scrub pipelines who stay CLOSED for too long.
- if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
- LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
- " since it is at CLOSED stage.");
- closeContainersForPipeline(p.getId());
- removePipeline(p);
- }
- }
- return;
- }
-
- @Override
- public Map<String, Integer> getPipelineInfo() {
- final Map<String, Integer> pipelineInfo = new HashMap<>();
- for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
- pipelineInfo.put(state.toString(), 0);
- }
- stateManager.getPipelines().forEach(pipeline ->
- pipelineInfo.computeIfPresent(
- pipeline.getPipelineState().toString(), (k, v) -> v + 1));
- return pipelineInfo;
- }
-
- /**
- * Schedules a fixed interval job to create pipelines.
- */
- @Override
- public void startPipelineCreator() {
- backgroundPipelineCreator.startFixedIntervalPipelineCreator();
- }
-
- /**
- * Triggers pipeline creation after the specified time.
- */
- @Override
- public void triggerPipelineCreation() {
- backgroundPipelineCreator.triggerPipelineCreation();
- }
-
- /**
- * Activates a dormant pipeline.
- *
- * @param pipelineID ID of the pipeline to activate.
- * @throws IOException in case of any Exception
- */
- @Override
- public void activatePipeline(PipelineID pipelineID)
- throws IOException {
- lock.writeLock().lock();
- try {
- Pipeline.PipelineState state = stateManager.
- getPipeline(pipelineID).getPipelineState();
- stateManager.activatePipeline(pipelineID);
- updatePipelineStateInDb(pipelineID, state);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Deactivates an active pipeline.
- *
- * @param pipelineID ID of the pipeline to deactivate.
- * @throws IOException in case of any Exception
- */
- @Override
- public void deactivatePipeline(PipelineID pipelineID)
- throws IOException {
- lock.writeLock().lock();
- try {
- Pipeline.PipelineState state = stateManager.
- getPipeline(pipelineID).getPipelineState();
- stateManager.deactivatePipeline(pipelineID);
- updatePipelineStateInDb(pipelineID, state);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * Wait a pipeline to be OPEN.
- *
- * @param pipelineID ID of the pipeline to wait for.
- * @param timeout wait timeout, millisecond, 0 to use default value
- * @throws IOException in case of any Exception, such as timeout
- */
- @Override
- public void waitPipelineReady(PipelineID pipelineID, long timeout)
- throws IOException {
- long st = Time.monotonicNow();
- if (timeout == 0) {
- timeout = pipelineWaitDefaultTimeout;
- }
-
- boolean ready;
- Pipeline pipeline;
- do {
- try {
- pipeline = stateManager.getPipeline(pipelineID);
- } catch (PipelineNotFoundException e) {
- throw new PipelineNotFoundException(String.format(
- "Pipeline %s cannot be found", pipelineID));
- }
- ready = pipeline.isOpen();
- if (!ready) {
- try {
- Thread.sleep((long)100);
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
- }
- } while (!ready && Time.monotonicNow() - st < timeout);
-
- if (!ready) {
- throw new IOException(String.format("Pipeline %s is not ready in %d ms",
- pipelineID, timeout));
- }
- }
-
- /**
- * Removes the pipeline from the db and pipeline state map.
- *
- * @param pipeline - pipeline to be removed
- * @throws IOException
- */
- protected void removePipeline(Pipeline pipeline) throws IOException {
- pipelineFactory.close(pipeline.getType(), pipeline);
- PipelineID pipelineID = pipeline.getId();
- lock.writeLock().lock();
- try {
- if (pipelineStore != null) {
- pipelineStore.delete(pipelineID);
- Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID);
- nodeManager.removePipeline(pipelineRemoved);
- metrics.incNumPipelineDestroyed();
- }
- } catch (IOException ex) {
- metrics.incNumPipelineDestroyFailed();
- throw ex;
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- @Override
- public void incNumBlocksAllocatedMetric(PipelineID id) {
- metrics.incNumBlocksAllocated(id);
- }
-
- @Override
- public void close() throws IOException {
- if (scheduler != null) {
- scheduler.close();
- scheduler = null;
- }
-
- if(pmInfoBean != null) {
- MBeans.unregister(this.pmInfoBean);
- pmInfoBean = null;
- }
-
- SCMPipelineMetrics.unRegister();
-
- // shutdown pipeline provider.
- pipelineFactory.shutdown();
- lock.writeLock().lock();
- try {
- if (pipelineStore != null) {
- pipelineStore.close();
- pipelineStore = null;
- }
- } catch (Exception ex) {
- LOG.error("Pipeline store close failed", ex);
- } finally {
- lock.writeLock().unlock();
- }
- }
-
- /**
- * returns min number of healthy volumes from the set of
- * datanodes constituting the pipeline.
- * @param pipeline
- * @return healthy volume count
- */
- @Override
- public int minHealthyVolumeNum(Pipeline pipeline) {
- return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
- }
-
- /**
- * returns max count of raft log volumes from the set of
- * datanodes constituting the pipeline.
- * @param pipeline
- * @return healthy volume count
- */
- @Override
- public int minPipelineLimit(Pipeline pipeline) {
- return nodeManager.minPipelineLimit(pipeline.getNodes());
- }
-
- protected ReadWriteLock getLock() {
- return lock;
- }
-
- @VisibleForTesting
- public PipelineFactory getPipelineFactory() {
- return pipelineFactory;
- }
-
- protected NodeManager getNodeManager() {
- return nodeManager;
- }
-
- @Override
- public boolean getSafeModeStatus() {
- return this.isInSafeMode.get();
- }
-
- @Override
- public void reinitialize(Table<PipelineID, Pipeline> store)
- throws IOException {
- throw new RuntimeException("Not supported operation.");
- }
-
- @Override
- public void freezePipelineCreation() {
- freezePipelineCreation.set(true);
- backgroundPipelineCreator.pause();
- }
-
- @Override
- public void resumePipelineCreation() {
- freezePipelineCreation.set(false);
- backgroundPipelineCreator.resume();
- }
-
- public Table<PipelineID, Pipeline> getPipelineStore() {
- return pipelineStore;
- }
-
- @Override
- public void onMessage(SafeModeStatus status,
- EventPublisher publisher) {
- // TODO: #CLUTIL - handle safemode getting re-enabled
- boolean currentAllowPipelines =
- pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
- boolean currentlyInSafeMode =
- isInSafeMode.getAndSet(status.isInSafeMode());
-
- // Trigger pipeline creation only if the preCheck status has changed to
- // complete.
- if (isPipelineCreationAllowed() && !currentAllowPipelines) {
- triggerPipelineCreation();
- }
- // Start the pipeline creation thread only when safemode switches off
- if (!getSafeModeStatus() && currentlyInSafeMode) {
- startPipelineCreator();
- }
- }
-
- @VisibleForTesting
- protected static Logger getLog() {
- return LOG;
- }
-}
\ No newline at end of file
diff --git
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
index eac04a0..864a3fc 100644
---
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
+++
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java
@@ -104,7 +104,7 @@ import org.apache.hadoop.hdds.scm.node.StaleNodeHandler;
import org.apache.hadoop.hdds.scm.node.NodeDecommissionManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineActionHandler;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineReportHandler;
import
org.apache.hadoop.hdds.scm.pipeline.choose.algorithms.PipelineChoosePolicyFactory;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
@@ -564,7 +564,7 @@ public final class StorageContainerManager extends
ServiceRuntimeInfoImpl
pipelineManager = configurator.getPipelineManager();
} else {
pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
conf,
scmHAManager,
scmNodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
index c033831..cfc9be4 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/block/TestBlockManager.java
@@ -57,7 +57,7 @@ import
org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
import org.apache.hadoop.hdds.scm.server.SCMConfigurator;
@@ -88,7 +88,7 @@ public class TestBlockManager {
private StorageContainerManager scm;
private ContainerManagerV2 mapping;
private MockNodeManager nodeManager;
- private PipelineManagerV2Impl pipelineManager;
+ private PipelineManagerImpl pipelineManager;
private BlockManagerImpl blockManager;
private SCMHAManager scmHAManager;
private SequenceIdGenerator sequenceIdGen;
@@ -136,7 +136,7 @@ public class TestBlockManager {
conf, scmHAManager, scmMetadataStore.getSequenceIdTable());
pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
conf,
scmHAManager,
nodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
index 289adb1..4d06401 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestCloseContainerEventHandler.java
@@ -39,7 +39,7 @@ import org.apache.hadoop.hdds.scm.ha.SequenceIdGenerator;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConsts;
@@ -63,7 +63,7 @@ public class TestCloseContainerEventHandler {
private static OzoneConfiguration configuration;
private static MockNodeManager nodeManager;
- private static PipelineManagerV2Impl pipelineManager;
+ private static PipelineManagerImpl pipelineManager;
private static ContainerManagerV2 containerManager;
private static long size;
private static File testDir;
@@ -94,7 +94,7 @@ public class TestCloseContainerEventHandler {
SCMServiceManager serviceManager = new SCMServiceManager();
pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
configuration,
scmhaManager,
nodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
index 5c02354..35ef3e4 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/TestSCMContainerManager.java
@@ -48,7 +48,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.ozone.OzoneConsts;
import org.apache.hadoop.ozone.container.common.SCMTestUtils;
@@ -69,7 +69,7 @@ import org.junit.rules.ExpectedException;
public class TestSCMContainerManager {
private static SCMContainerManager containerManager;
private static MockNodeManager nodeManager;
- private static PipelineManagerV2Impl pipelineManager;
+ private static PipelineManagerImpl pipelineManager;
private static File testDir;
private static XceiverClientManager xceiverClientManager;
private static Random random;
@@ -95,7 +95,7 @@ public class TestSCMContainerManager {
}
nodeManager = new MockNodeManager(true, 10);
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(conf);
- pipelineManager = PipelineManagerV2Impl.newPipelineManager(
+ pipelineManager = PipelineManagerImpl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(true),
nodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
index a2acc38..1493b03 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestContainerPlacement.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.server.SCMStorageConfig;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager;
@@ -131,7 +131,7 @@ public class TestContainerPlacement {
EventQueue eventQueue = new EventQueue();
PipelineManager pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
scmNodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
index 2a04af4..0786832 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestDeadNodeHandler.java
@@ -55,7 +55,7 @@ import org.apache.hadoop.hdds.scm.events.SCMEvents;
import org.apache.hadoop.hdds.scm.node.states.NodeNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.server.StorageContainerManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -81,7 +81,7 @@ public class TestDeadNodeHandler {
private SCMNodeManager nodeManager;
private ContainerManagerV2 containerManager;
private NodeReportHandler nodeReportHandler;
- private PipelineManagerV2Impl pipelineManager;
+ private PipelineManagerImpl pipelineManager;
private DeadNodeHandler deadNodeHandler;
private EventPublisher publisher;
private EventQueue eventQueue;
@@ -102,7 +102,7 @@ public class TestDeadNodeHandler {
scm = TestUtils.getScm(conf);
nodeManager = (SCMNodeManager) scm.getScmNodeManager();
pipelineManager =
- (PipelineManagerV2Impl)scm.getPipelineManager();
+ (PipelineManagerImpl)scm.getPipelineManager();
PipelineProvider mockRatisProvider =
new MockRatisPipelineProvider(nodeManager,
pipelineManager.getStateManager(), conf);
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
index a0bb44a..d76df1e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestPipelineManagerImpl.java
@@ -103,9 +103,9 @@ public class TestPipelineManagerImpl {
FileUtil.fullyDelete(testDir);
}
- private PipelineManagerV2Impl createPipelineManager(boolean isLeader)
+ private PipelineManagerImpl createPipelineManager(boolean isLeader)
throws IOException {
- return PipelineManagerV2Impl.newPipelineManager(conf,
+ return PipelineManagerImpl.newPipelineManager(conf,
MockSCMHAManager.getInstance(isLeader),
new MockNodeManager(true, 20),
SCMDBDefinition.PIPELINES.getTable(dbStore),
@@ -114,9 +114,9 @@ public class TestPipelineManagerImpl {
serviceManager);
}
- private PipelineManagerV2Impl createPipelineManager(
+ private PipelineManagerImpl createPipelineManager(
boolean isLeader, SCMHADBTransactionBuffer buffer) throws IOException {
- return PipelineManagerV2Impl.newPipelineManager(conf,
+ return PipelineManagerImpl.newPipelineManager(conf,
MockSCMHAManager.getInstance(isLeader, buffer),
new MockNodeManager(true, 20),
SCMDBDefinition.PIPELINES.getTable(dbStore),
@@ -129,7 +129,7 @@ public class TestPipelineManagerImpl {
public void testCreatePipeline() throws Exception {
SCMHADBTransactionBuffer buffer1 =
new MockSCMHADBTransactionBuffer(dbStore);
- PipelineManagerV2Impl pipelineManager =
+ PipelineManagerImpl pipelineManager =
createPipelineManager(true, buffer1);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
Pipeline pipeline1 = pipelineManager.createPipeline(
@@ -146,7 +146,7 @@ public class TestPipelineManagerImpl {
SCMHADBTransactionBuffer buffer2 =
new MockSCMHADBTransactionBuffer(dbStore);
- PipelineManagerV2Impl pipelineManager2 =
+ PipelineManagerImpl pipelineManager2 =
createPipelineManager(true, buffer2);
// Should be able to load previous pipelines.
Assert.assertFalse(pipelineManager2.getPipelines().isEmpty());
@@ -162,7 +162,7 @@ public class TestPipelineManagerImpl {
@Test
public void testCreatePipelineShouldFailOnFollower() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(false);
+ PipelineManagerImpl pipelineManager = createPipelineManager(false);
Assert.assertTrue(pipelineManager.getPipelines().isEmpty());
try {
pipelineManager
@@ -178,7 +178,7 @@ public class TestPipelineManagerImpl {
@Test
public void testUpdatePipelineStates() throws Exception {
SCMHADBTransactionBuffer buffer = new
MockSCMHADBTransactionBuffer(dbStore);
- PipelineManagerV2Impl pipelineManager =
+ PipelineManagerImpl pipelineManager =
createPipelineManager(true, buffer);
Table<PipelineID, Pipeline> pipelineStore =
SCMDBDefinition.PIPELINES.getTable(dbStore);
@@ -221,7 +221,7 @@ public class TestPipelineManagerImpl {
@Test
public void testOpenPipelineShouldFailOnFollower() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager.createPipeline(
new RatisReplicationConfig(ReplicationFactor.THREE));
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -242,7 +242,7 @@ public class TestPipelineManagerImpl {
@Test
public void testActivatePipelineShouldFailOnFollower() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager.createPipeline(
new RatisReplicationConfig(ReplicationFactor.THREE));
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -263,7 +263,7 @@ public class TestPipelineManagerImpl {
@Test
public void testDeactivatePipelineShouldFailOnFollower() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager.createPipeline(
new RatisReplicationConfig(ReplicationFactor.THREE));
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -284,7 +284,7 @@ public class TestPipelineManagerImpl {
@Test
public void testRemovePipeline() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
// Create a pipeline
Pipeline pipeline = pipelineManager.createPipeline(
new RatisReplicationConfig(ReplicationFactor.THREE));
@@ -325,7 +325,7 @@ public class TestPipelineManagerImpl {
@Test
public void testClosePipelineShouldFailOnFollower() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager.createPipeline(
new RatisReplicationConfig(ReplicationFactor.THREE));
Assert.assertEquals(1, pipelineManager.getPipelines().size());
@@ -346,7 +346,7 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineReport() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
SCMSafeModeManager scmSafeModeManager =
new SCMSafeModeManager(conf, new ArrayList<>(), null, pipelineManager,
new EventQueue(), serviceManager, scmContext);
@@ -395,7 +395,7 @@ public class TestPipelineManagerImpl {
@Test
public void testPipelineCreationFailedMetric() throws Exception {
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
// No pipeline at start
MetricsRecordBuilder metrics = getMetrics(
@@ -450,7 +450,7 @@ public class TestPipelineManagerImpl {
public void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
SCMHADBTransactionBuffer buffer1 =
new MockSCMHADBTransactionBuffer(dbStore);
- PipelineManagerV2Impl pipelineManager =
+ PipelineManagerImpl pipelineManager =
createPipelineManager(true, buffer1);
Pipeline pipeline = pipelineManager
@@ -499,7 +499,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager
.createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
// At this point, pipeline is not at OPEN stage.
@@ -528,7 +528,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
Pipeline pipeline = pipelineManager
.createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
// At this point, pipeline is not at OPEN stage.
@@ -565,7 +565,7 @@ public class TestPipelineManagerImpl {
scmContext.updateSafeModeStatus(
new SCMSafeModeManager.SafeModeStatus(true, false));
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
try {
pipelineManager
.createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
@@ -602,7 +602,7 @@ public class TestPipelineManagerImpl {
OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
TimeUnit.MILLISECONDS);
- PipelineManagerV2Impl pipelineManager = createPipelineManager(true);
+ PipelineManagerImpl pipelineManager = createPipelineManager(true);
scmContext.updateSafeModeStatus(
new SCMSafeModeManager.SafeModeStatus(true, false));
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
deleted file mode 100644
index c1d9db9..0000000
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/pipeline/TestSCMPipelineManager.java
+++ /dev/null
@@ -1,821 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- * <p>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p>
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import java.io.File;
-import java.io.IOException;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.NoSuchElementException;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.TimeoutException;
-import java.util.stream.Collectors;
-
-import org.apache.hadoop.fs.FileUtil;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.conf.OzoneConfiguration;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.scm.TestUtils;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.container.MockNodeManager;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.ha.SCMServiceManager;
-import org.apache.hadoop.hdds.scm.metadata.PipelineIDCodec;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStore;
-import org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
-import org.apache.hadoop.hdds.scm.node.NodeStatus;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager;
-import
org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher.PipelineReportFromDatanode;
-import org.apache.hadoop.hdds.server.events.EventQueue;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.metrics2.MetricsRecordBuilder;
-import org.apache.ozone.test.GenericTestUtils;
-
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_PIPELINE_LIMIT;
-import static
org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT;
-import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
-import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
-
-import org.apache.ozone.test.GenericTestUtils.LogCapturer;
-import org.junit.jupiter.api.AfterEach;
-import org.junit.jupiter.api.BeforeEach;
-import org.junit.jupiter.api.Test;
-import org.junit.rules.TemporaryFolder;
-import org.mockito.InOrder;
-
-
-import static org.junit.jupiter.api.Assertions.assertEquals;
-import static org.junit.jupiter.api.Assertions.assertFalse;
-import static org.junit.jupiter.api.Assertions.assertNotNull;
-import static org.junit.jupiter.api.Assertions.assertTrue;
-import static org.junit.jupiter.api.Assertions.fail;
-import static org.mockito.Mockito.doReturn;
-import static org.mockito.Mockito.inOrder;
-import static org.mockito.Mockito.mock;
-import static org.mockito.Mockito.never;
-import static org.mockito.Mockito.verify;
-import static org.mockito.Mockito.when;
-import static org.slf4j.event.Level.INFO;
-
-/**
- * Test cases to verify PipelineManager.
- */
-class TestSCMPipelineManager {
- private MockNodeManager nodeManager;
- private File testDir;
- private OzoneConfiguration conf;
- private SCMMetadataStore scmMetadataStore;
-
- @BeforeEach
- void setUp() throws Exception {
- conf = new OzoneConfiguration();
- conf.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 1);
- testDir = GenericTestUtils
- .getTestDir(TestSCMPipelineManager.class.getSimpleName());
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- conf.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION, false);
- boolean folderExisted = testDir.exists() || testDir.mkdirs();
- if (!folderExisted) {
- throw new IOException("Unable to create test directory path");
- }
- nodeManager = new MockNodeManager(true, 20);
- nodeManager.setNumPipelinePerDatanode(1);
-
- scmMetadataStore = new SCMMetadataStoreImpl(conf);
- }
-
- @AfterEach
- void cleanup() throws Exception {
- scmMetadataStore.getStore().close();
- FileUtil.fullyDelete(testDir);
- }
-
- @Test
- void testPipelineReload() throws IOException {
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf,
- nodeManager,
- scmMetadataStore.getPipelineTable(),
- new EventQueue());
- pipelineManager.allowPipelineCreation();
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
- int pipelineNum = 5;
-
- Set<Pipeline> pipelines = new HashSet<>();
- for (int i = 0; i < pipelineNum; i++) {
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- pipelineManager.openPipeline(pipeline.getId());
- pipelines.add(pipeline);
- }
- pipelineManager.close();
-
- // new pipeline manager should be able to load the pipelines from the db
- pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- pipelineManager.allowPipelineCreation();
- mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
- for (Pipeline p : pipelines) {
- // After reload, pipelines should be in open state
- assertTrue(pipelineManager.getPipeline(p.getId()).isOpen());
- }
- List<Pipeline> pipelineList =
- pipelineManager.getPipelines(new RatisReplicationConfig(
- ReplicationFactor.THREE));
- assertEquals(pipelines, new HashSet<>(pipelineList));
-
- Set<Set<DatanodeDetails>> originalPipelines = pipelineList.stream()
- .map(Pipeline::getNodeSet).collect(Collectors.toSet());
- Set<Set<DatanodeDetails>> reloadedPipelineHash = pipelines.stream()
- .map(Pipeline::getNodeSet).collect(Collectors.toSet());
- assertEquals(reloadedPipelineHash, originalPipelines);
- assertEquals(pipelineNum, originalPipelines.size());
-
- // clean up
- for (Pipeline pipeline : pipelines) {
- pipelineManager.closePipeline(pipeline, false);
- }
- pipelineManager.close();
- }
-
- @Test
- void testRemovePipeline() throws IOException {
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- pipelineManager.allowPipelineCreation();
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
-
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- pipelineManager.openPipeline(pipeline.getId());
- pipelineManager
- .addContainerToPipeline(pipeline.getId(), ContainerID.valueOf(1));
- pipelineManager.closePipeline(pipeline, false);
- pipelineManager.close();
-
- // new pipeline manager should not be able to load removed pipelines
- pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- try {
- pipelineManager.getPipeline(pipeline.getId());
- fail("Pipeline should not have been retrieved");
- } catch (IOException e) {
- assertTrue(e.getMessage().contains("not found"));
- }
-
- // clean up
- pipelineManager.close();
- }
-
- @Test
- void testPipelineReport() throws IOException {
- EventQueue eventQueue = new EventQueue();
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- pipelineManager.allowPipelineCreation();
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
-
- SCMSafeModeManager scmSafeModeManager =
- new SCMSafeModeManager(conf, new ArrayList<>(), null,
- pipelineManager, eventQueue, new SCMServiceManager(),
- SCMContext.emptyContext());
-
- // create a pipeline in allocated state with no dns yet reported
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
-
- assertFalse(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
- assertFalse(pipelineManager.getPipeline(pipeline.getId()).isOpen());
-
- // pipeline is not healthy until all dns report
- List<DatanodeDetails> nodes = pipeline.getNodes();
- assertFalse(
- pipelineManager.getPipeline(pipeline.getId()).isHealthy());
- // get pipeline report from each dn in the pipeline
- PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager,
- SCMContext.emptyContext(), conf);
- nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
- pipelineReportHandler, false, eventQueue));
- sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
- pipelineReportHandler, true, eventQueue);
-
- // pipeline is healthy when all dns report
- assertTrue(pipelineManager.getPipeline(pipeline.getId()).isHealthy());
- // pipeline should now move to open state
- assertTrue(pipelineManager.getPipeline(pipeline.getId()).isOpen());
-
- // close the pipeline
- pipelineManager.closePipeline(pipeline, false);
-
- // pipeline report for destroyed pipeline should be ignored
- nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
- pipelineReportHandler, false, eventQueue));
- sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
- pipelineReportHandler, true, eventQueue);
-
- try {
- pipelineManager.getPipeline(pipeline.getId());
- fail("Pipeline should not have been retrieved");
- } catch (IOException e) {
- assertTrue(e.getMessage().contains("not found"));
- }
-
- // clean up
- pipelineManager.close();
- }
-
- @Test
- void testPipelineCreationFailedMetric() throws Exception {
- MockNodeManager nodeManagerMock = new MockNodeManager(true,
- 20);
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManagerMock,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- pipelineManager.allowPipelineCreation();
- nodeManagerMock.setNumPipelinePerDatanode(1);
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManagerMock,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
-
- MetricsRecordBuilder metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
- metrics);
- assertEquals(0, numPipelineAllocated);
-
- // 3 DNs are unhealthy.
- // Create 5 pipelines (Use up 15 Datanodes)
- for (int i = 0; i < 5; i++) {
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- assertNotNull(pipeline);
- }
-
- metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
- assertEquals(5, numPipelineAllocated);
-
- long numPipelineCreateFailed = getLongCounter(
- "NumPipelineCreationFailed", metrics);
- assertEquals(0, numPipelineCreateFailed);
-
- LogCapturer logs = LogCapturer.captureLogs(SCMPipelineManager.getLog());
- GenericTestUtils.setLogLevel(SCMPipelineManager.getLog(), INFO);
- //This should fail...
- try {
- pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- fail();
- } catch (SCMException ioe) {
- // pipeline creation failed this time.
- assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
- ioe.getResult());
- assertFalse(logs.getOutput().contains(
- "Failed to create pipeline of type"));
- } finally {
- logs.stopCapturing();
- }
-
- metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
- assertEquals(5, numPipelineAllocated);
-
- numPipelineCreateFailed = getLongCounter(
- "NumPipelineCreationFailed", metrics);
- assertEquals(1, numPipelineCreateFailed);
-
- // clean up
- pipelineManager.close();
- }
-
- @Test
- void testPipelineLimit() throws Exception {
- int numMetaDataVolumes = 2;
- final OzoneConfiguration config = new OzoneConfiguration();
- config.set(HddsConfigKeys.OZONE_METADATA_DIRS, testDir.getAbsolutePath());
- config.setBoolean(HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_CREATION,
- false);
- // turning off this config will ensure, pipeline creation is determined by
- // metadata volume count.
- config.setInt(OZONE_DATANODE_PIPELINE_LIMIT, 0);
- MockNodeManager nodeManagerMock = new MockNodeManager(true,
- 3);
- nodeManagerMock.setNumMetaDataVolumes(numMetaDataVolumes);
- int pipelinePerDn = numMetaDataVolumes *
- MockNodeManager.NUM_PIPELINE_PER_METADATA_DISK;
- nodeManagerMock.setNumPipelinePerDatanode(pipelinePerDn);
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(config, nodeManagerMock,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- pipelineManager.allowPipelineCreation();
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManagerMock,
- pipelineManager.getStateManager(), config);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
-
- MetricsRecordBuilder metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- long numPipelineAllocated = getLongCounter("NumPipelineAllocated",
- metrics);
- assertEquals(0, numPipelineAllocated);
-
- // one node pipeline creation will not be accounted for
- // pipeline limit determination
- pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.ONE));
- // max limit on no of pipelines is 4
- for (int i = 0; i < pipelinePerDn; i++) {
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- assertNotNull(pipeline);
- }
-
- metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
- assertEquals(5, numPipelineAllocated);
-
- long numPipelineCreateFailed = getLongCounter(
- "NumPipelineCreationFailed", metrics);
- assertEquals(0, numPipelineCreateFailed);
- //This should fail...
- try {
- pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- fail();
- } catch (SCMException ioe) {
- // pipeline creation failed this time.
- assertEquals(SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE,
- ioe.getResult());
- }
-
- metrics = getMetrics(
- SCMPipelineMetrics.class.getSimpleName());
- numPipelineAllocated = getLongCounter("NumPipelineAllocated", metrics);
- assertEquals(5, numPipelineAllocated);
-
- numPipelineCreateFailed = getLongCounter(
- "NumPipelineCreationFailed", metrics);
- assertEquals(1, numPipelineCreateFailed);
-
- // clean up
- pipelineManager.close();
- }
-
- @Test
- void testActivateDeactivatePipeline() throws IOException {
- final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), new EventQueue());
- pipelineManager.allowPipelineCreation();
- final PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
-
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
-
- final Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- final PipelineID pid = pipeline.getId();
-
- pipelineManager.openPipeline(pid);
- pipelineManager.addContainerToPipeline(pid, ContainerID.valueOf(1));
-
- assertTrue(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
- Pipeline.PipelineState.OPEN).contains(pipeline));
-
- assertEquals(Pipeline.PipelineState.OPEN,
- pipelineManager.getPipeline(pid).getPipelineState());
-
- pipelineManager.deactivatePipeline(pid);
- assertEquals(Pipeline.PipelineState.DORMANT,
- pipelineManager.getPipeline(pid).getPipelineState());
-
- assertFalse(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
- Pipeline.PipelineState.OPEN).contains(pipeline));
-
- pipelineManager.activatePipeline(pid);
-
- assertTrue(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
- Pipeline.PipelineState.OPEN).contains(pipeline));
-
- pipelineManager.close();
- }
-
- @Test
- void testPipelineOpenOnlyWhenLeaderReported() throws Exception {
- EventQueue eventQueue = new EventQueue();
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- pipelineManager.allowPipelineCreation();
- PipelineProvider mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- // close manager
- pipelineManager.close();
- // new pipeline manager loads the pipelines from the db in ALLOCATED state
- pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- mockRatisProvider =
- new MockRatisPipelineProvider(nodeManager,
- pipelineManager.getStateManager(), conf);
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- mockRatisProvider);
- assertEquals(Pipeline.PipelineState.ALLOCATED,
- pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
-
- SCMSafeModeManager scmSafeModeManager =
- new SCMSafeModeManager(new OzoneConfiguration(), new ArrayList<>(),
- null, pipelineManager, eventQueue,
- new SCMServiceManager(),
- SCMContext.emptyContext());
- PipelineReportHandler pipelineReportHandler =
- new PipelineReportHandler(scmSafeModeManager, pipelineManager,
- SCMContext.emptyContext(), conf);
-
- // Report pipelines with leaders
- List<DatanodeDetails> nodes = pipeline.getNodes();
- assertEquals(3, nodes.size());
- // Send report for all but no leader
- nodes.forEach(dn -> sendPipelineReport(dn, pipeline, pipelineReportHandler,
- false, eventQueue));
-
- assertEquals(Pipeline.PipelineState.ALLOCATED,
- pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
-
- nodes.subList(0, 2).forEach(dn -> sendPipelineReport(dn, pipeline,
- pipelineReportHandler, false, eventQueue));
- sendPipelineReport(nodes.get(nodes.size() - 1), pipeline,
- pipelineReportHandler, true, eventQueue);
-
- assertEquals(Pipeline.PipelineState.OPEN,
- pipelineManager.getPipeline(pipeline.getId()).getPipelineState());
-
- pipelineManager.close();
- }
-
- @Test
- void testScrubPipeline() throws IOException {
- // No timeout for pipeline scrubber.
- conf.setTimeDuration(
- OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
- TimeUnit.MILLISECONDS);
-
- EventQueue eventQueue = new EventQueue();
- final SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- pipelineManager.allowPipelineCreation();
- final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
- nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
- false);
-
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- ratisProvider);
-
- Pipeline pipeline = pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- // At this point, pipeline is not at OPEN stage.
- assertEquals(Pipeline.PipelineState.ALLOCATED,
- pipeline.getPipelineState());
-
- // pipeline should be seen in pipelineManager as ALLOCATED.
- assertTrue(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
- pipelineManager
- .scrubPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
-
- // pipeline should be scrubbed.
- assertFalse(pipelineManager
- .getPipelines(new RatisReplicationConfig(ReplicationFactor.THREE),
- Pipeline.PipelineState.ALLOCATED).contains(pipeline));
-
- pipelineManager.close();
- }
-
- @Test
- void testPipelineNotCreatedUntilSafeModePrecheck()
- throws IOException, TimeoutException, InterruptedException {
- // No timeout for pipeline scrubber.
- conf.setTimeDuration(
- OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
- TimeUnit.MILLISECONDS);
-
- EventQueue eventQueue = new EventQueue();
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
- nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
- false);
-
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- ratisProvider);
-
- try {
- pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.THREE));
- fail("Pipelines should not have been created");
- } catch (IOException e) {
- // expected
- }
-
- // Ensure a pipeline of factor ONE can be created - no exceptions should be
- // raised.
- pipelineManager
- .createPipeline(new RatisReplicationConfig(ReplicationFactor.ONE));
-
- // Simulate safemode check exiting.
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
- GenericTestUtils.waitFor(() -> pipelineManager.getPipelines().size() != 0,
- 100, 10000);
- pipelineManager.close();
- }
-
- @Test
- void testSafeModeUpdatedOnSafemodeExit() throws IOException {
- // No timeout for pipeline scrubber.
- conf.setTimeDuration(
- OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, -1,
- TimeUnit.MILLISECONDS);
-
- EventQueue eventQueue = new EventQueue();
- SCMPipelineManager pipelineManager =
- new SCMPipelineManager(conf, nodeManager,
- scmMetadataStore.getPipelineTable(), eventQueue);
- final PipelineProvider ratisProvider = new MockRatisPipelineProvider(
- nodeManager, pipelineManager.getStateManager(), conf, eventQueue,
- false);
-
- pipelineManager.setPipelineProvider(HddsProtos.ReplicationType.RATIS,
- ratisProvider);
-
- assertTrue(pipelineManager.getSafeModeStatus());
- assertFalse(pipelineManager.isPipelineCreationAllowed());
- // First pass pre-check as true, but safemode still on
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(true, true), null);
- assertTrue(pipelineManager.getSafeModeStatus());
- assertTrue(pipelineManager.isPipelineCreationAllowed());
-
- // Then also turn safemode off
- pipelineManager.onMessage(
- new SCMSafeModeManager.SafeModeStatus(false, true), null);
- assertFalse(pipelineManager.getSafeModeStatus());
- assertTrue(pipelineManager.isPipelineCreationAllowed());
- pipelineManager.close();
- }
-
- /**
- * This test was created for HDDS-3925 to check whether the db handling is
- * proper at the SCMPipelineManager level. We should remove this test
- * when we remove the key swap from the SCMPipelineManager code.
- *
- * The test emulates internally the values that the iterator will provide
- * back to the check-fix code path. The iterator internally deserialize the
- * key stored in RocksDB using the PipelineIDCodec. The older version of the
- * codec serialized the PipelineIDs by taking the byte[] representation of
- * the protobuf representation of the PipelineID, and deserialization was not
- * implemented.
- *
- * In order to be able to check and fix the change, the deserialization was
- * introduced, and deserialisation of the old protobuf byte representation
- * with the new deserialization logic of the keys are
- * checked against the PipelineID serialized in the value as well via
- * protobuf.
- * The DB is storing the keys now based on a byte[] serialized from the UUID
- * inside the PipelineID.
- * For this we emulate the getKey of the KeyValue returned by the
- * iterator to return a PipelineID that is deserialized from the byte[]
- * representation of the protobuf representation of the PipelineID in the
- * test, as that would be the value we get from the iterator when iterating
- * through a table with the old key format.
- *
- * @throws Exception when something goes wrong
- */
- @Test
- void testPipelineDBKeyFormatChange() throws Exception {
- Pipeline p1 = pipelineStub();
- Pipeline p2 = pipelineStub();
- Pipeline p3 = pipelineStub();
-
- TableIterator<PipelineID, KeyValue<PipelineID, Pipeline>> iteratorMock =
- mock(TableIterator.class);
-
- KeyValue<PipelineID, Pipeline> kv1 =
- mockKeyValueToProvideOldKeyFormat(p1);
- KeyValue<PipelineID, Pipeline> kv2 =
- mockKeyValueToProvideNormalFormat(p2);
- KeyValue<PipelineID, Pipeline> kv3 =
- mockKeyValueToProvideOldKeyFormat(p3);
-
- when(iteratorMock.next())
- .thenReturn(kv1, kv2, kv3)
- .thenThrow(new NoSuchElementException());
- when(iteratorMock.hasNext())
- .thenReturn(true, true, true, false);
-
- Table<PipelineID, Pipeline> pipelineStore = mock(Table.class);
- doReturn(iteratorMock).when(pipelineStore).iterator();
- when(pipelineStore.isEmpty()).thenReturn(false);
-
- InOrder inorderVerifier = inOrder(pipelineStore, iteratorMock);
-
- new SCMPipelineManager(conf, nodeManager, pipelineStore, new EventQueue());
-
- inorderVerifier.verify(iteratorMock).removeFromDB();
- inorderVerifier.verify(pipelineStore).put(p1.getId(), p1);
- inorderVerifier.verify(iteratorMock).removeFromDB();
- inorderVerifier.verify(pipelineStore).put(p3.getId(), p3);
-
- verify(pipelineStore, never()).put(p2.getId(), p2);
- }
-
- @Test
- void testScmWithPipelineDBKeyFormatChange() throws Exception {
- TemporaryFolder tempDir = new TemporaryFolder();
- tempDir.create();
- File dir = tempDir.newFolder();
- conf.set(HddsConfigKeys.OZONE_METADATA_DIRS, dir.getAbsolutePath());
-
- SCMMetadataStore scmDbWithOldKeyFormat = null;
- Map<UUID, Pipeline> oldPipelines = new HashMap<>();
- try {
- scmDbWithOldKeyFormat =
- new TestSCMStoreImplWithOldPipelineIDKeyFormat(conf);
- // Create 3 pipelines.
- for (int i = 0; i < 3; i++) {
- Pipeline pipeline = pipelineStub();
- scmDbWithOldKeyFormat.getPipelineTable()
- .put(pipeline.getId(), pipeline);
- oldPipelines.put(pipeline.getId().getId(), pipeline);
- }
- } finally {
- if (scmDbWithOldKeyFormat != null) {
- scmDbWithOldKeyFormat.stop();
- }
- }
-
- LogCapturer logCapturer =
- LogCapturer.captureLogs(SCMPipelineManager.getLog());
-
- // Create SCMPipelineManager with new DBDefinition.
- SCMMetadataStore newScmMetadataStore = null;
- try {
- newScmMetadataStore = new SCMMetadataStoreImpl(conf);
- SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
- nodeManager,
- newScmMetadataStore.getPipelineTable(),
- new EventQueue());
-
- waitForLog(logCapturer);
- assertEquals(3, pipelineManager.getPipelines().size());
- oldPipelines.values().forEach(p ->
- pipelineManager.containsPipeline(p.getId()));
- } finally {
- if (newScmMetadataStore != null) {
- newScmMetadataStore.stop();
- }
- }
-
- // Mimicking another restart.
- try {
- logCapturer.clearOutput();
- newScmMetadataStore = new SCMMetadataStoreImpl(conf);
- SCMPipelineManager pipelineManager = new SCMPipelineManager(conf,
- nodeManager,
- newScmMetadataStore.getPipelineTable(),
- new EventQueue());
- try {
- waitForLog(logCapturer);
- fail("Unexpected log: " + logCapturer.getOutput());
- } catch (TimeoutException ex) {
- assertTrue(ex.getMessage().contains("Timed out"));
- }
- assertEquals(3, pipelineManager.getPipelines().size());
- oldPipelines.values().forEach(p ->
- pipelineManager.containsPipeline(p.getId()));
- } finally {
- newScmMetadataStore.stop();
- }
- }
-
- private static void waitForLog(LogCapturer logCapturer)
- throws TimeoutException, InterruptedException {
- GenericTestUtils.waitFor(() -> logCapturer.getOutput()
- .contains("Found pipeline in old format key"),
- 1000, 5000);
- }
-
- private Pipeline pipelineStub() {
- return Pipeline.newBuilder()
- .setId(PipelineID.randomId())
- .setReplicationConfig(new
RatisReplicationConfig(ReplicationFactor.ONE))
- .setState(Pipeline.PipelineState.OPEN)
- .setNodes(
- Arrays.asList(
- nodeManager.getNodes(NodeStatus.inServiceHealthy()).get(0)
- )
- )
- .setNodesInOrder(Arrays.asList(0))
- .build();
- }
-
- private KeyValue<PipelineID, Pipeline>
- mockKeyValueToProvideOldKeyFormat(Pipeline pipeline)
- throws IOException {
- KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
- when(kv.getValue()).thenReturn(pipeline);
- when(kv.getKey())
- .thenReturn(
- new PipelineIDCodec().fromPersistedFormat(
- pipeline.getId().getProtobuf().toByteArray()
- ));
- return kv;
- }
-
- private KeyValue<PipelineID, Pipeline>
- mockKeyValueToProvideNormalFormat(Pipeline pipeline)
- throws IOException {
- KeyValue<PipelineID, Pipeline> kv = mock(KeyValue.class);
- when(kv.getValue()).thenReturn(pipeline);
- when(kv.getKey()).thenReturn(pipeline.getId());
- return kv;
- }
-
- private void sendPipelineReport(DatanodeDetails dn,
- Pipeline pipeline, PipelineReportHandler pipelineReportHandler,
- boolean isLeader, EventQueue eventQueue) {
- PipelineReportFromDatanode report =
- TestUtils.getPipelineReportFromDatanode(dn, pipeline.getId(),
isLeader);
- pipelineReportHandler.onMessage(report, eventQueue);
- }
-}
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
index 14b839d..1b0d88a 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestHealthyPipelineSafeModeRule.java
@@ -41,7 +41,7 @@ import
org.apache.hadoop.hdds.scm.metadata.SCMMetadataStoreImpl;
import org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.ozone.test.GenericTestUtils;
@@ -77,8 +77,8 @@ public class TestHealthyPipelineSafeModeRule {
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
try {
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
nodeManager,
@@ -131,8 +131,8 @@ public class TestHealthyPipelineSafeModeRule {
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
try {
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
nodeManager,
@@ -229,8 +229,8 @@ public class TestHealthyPipelineSafeModeRule {
SCMMetadataStore scmMetadataStore = new SCMMetadataStoreImpl(config);
try {
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
nodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
index e91a505..c9840e7 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestOneReplicaPipelineSafeModeRule.java
@@ -44,7 +44,7 @@ import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
import org.apache.hadoop.hdds.scm.pipeline.PipelineNotFoundException;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventQueue;
import org.apache.ozone.test.GenericTestUtils;
@@ -63,7 +63,7 @@ public class TestOneReplicaPipelineSafeModeRule {
@Rule
public TemporaryFolder folder = new TemporaryFolder();
private OneReplicaPipelineSafeModeRule rule;
- private PipelineManagerV2Impl pipelineManager;
+ private PipelineManagerImpl pipelineManager;
private EventQueue eventQueue;
private SCMServiceManager serviceManager;
private SCMContext scmContext;
@@ -90,7 +90,7 @@ public class TestOneReplicaPipelineSafeModeRule {
SCMMetadataStore scmMetadataStore =
new SCMMetadataStoreImpl(ozoneConfiguration);
- pipelineManager = PipelineManagerV2Impl.newPipelineManager(
+ pipelineManager = PipelineManagerImpl.newPipelineManager(
ozoneConfiguration,
MockSCMHAManager.getInstance(true),
mockNodeManager,
diff --git
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
index 8ac3fcd..ef6345e 100644
---
a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
+++
b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/safemode/TestSCMSafeModeManager.java
@@ -48,7 +48,7 @@ import
org.apache.hadoop.hdds.scm.pipeline.MockRatisPipelineProvider;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineManager;
import org.apache.hadoop.hdds.scm.pipeline.PipelineProvider;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.server.SCMDatanodeHeartbeatDispatcher;
import org.apache.hadoop.hdds.server.events.EventHandler;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -257,7 +257,7 @@ public class TestSCMSafeModeManager {
0.9);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(true),
mockNodeManager,
@@ -283,7 +283,7 @@ public class TestSCMSafeModeManager {
200);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(true),
mockNodeManager,
@@ -308,7 +308,7 @@ public class TestSCMSafeModeManager {
conf.setDouble(HddsConfigKeys.HDDS_SCM_SAFEMODE_THRESHOLD_PCT, -1.0);
MockNodeManager mockNodeManager = new MockNodeManager(true, 10);
PipelineManager pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(true),
mockNodeManager,
@@ -339,8 +339,8 @@ public class TestSCMSafeModeManager {
containers.addAll(HddsTestUtils.getContainerInfo(containerCount));
MockNodeManager mockNodeManager = new MockNodeManager(true, nodeCount);
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
conf,
MockSCMHAManager.getInstance(true),
mockNodeManager,
@@ -593,8 +593,8 @@ public class TestSCMSafeModeManager {
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
nodeManager,
@@ -658,8 +658,8 @@ public class TestSCMSafeModeManager {
config.setBoolean(
HddsConfigKeys.HDDS_SCM_SAFEMODE_PIPELINE_AVAILABILITY_CHECK, true);
- PipelineManagerV2Impl pipelineManager =
- PipelineManagerV2Impl.newPipelineManager(
+ PipelineManagerImpl pipelineManager =
+ PipelineManagerImpl.newPipelineManager(
config,
MockSCMHAManager.getInstance(true),
nodeManager,
diff --git
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
index 72e0a76..1cf9794 100644
---
a/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
+++
b/hadoop-ozone/recon/src/main/java/org/apache/hadoop/ozone/recon/scm/ReconPipelineManager.java
@@ -30,7 +30,7 @@ import org.apache.hadoop.hdds.scm.node.NodeManager;
import org.apache.hadoop.hdds.scm.pipeline.Pipeline;
import org.apache.hadoop.hdds.scm.pipeline.PipelineFactory;
import org.apache.hadoop.hdds.scm.pipeline.PipelineID;
-import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerV2Impl;
+import org.apache.hadoop.hdds.scm.pipeline.PipelineManagerImpl;
import org.apache.hadoop.hdds.scm.pipeline.PipelineStateManagerV2Impl;
import org.apache.hadoop.hdds.scm.pipeline.StateManager;
import org.apache.hadoop.hdds.server.events.EventPublisher;
@@ -47,7 +47,7 @@ import static
org.apache.hadoop.hdds.scm.pipeline.Pipeline.PipelineState.CLOSED;
/**
* Recon's overriding implementation of SCM's Pipeline Manager.
*/
-public final class ReconPipelineManager extends PipelineManagerV2Impl {
+public final class ReconPipelineManager extends PipelineManagerImpl {
private static final Logger LOG =
LoggerFactory.getLogger(ReconPipelineManager.class);
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]