This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch HDDS-3816-ec in repository https://gitbox.apache.org/repos/asf/ozone.git
commit e08d5009ab99f8c0e315823753e1699fe64f1c7b Author: S O'Donnell <[email protected]> AuthorDate: Fri Sep 10 15:34:01 2021 +0100 Removed SCMPipelineManager.java which should have been removed via HDDS-5554, but was somehow missing on an earlier master to ec branch merge. --- .../hdds/scm/pipeline/SCMPipelineManager.java | 842 --------------------- 1 file changed, 842 deletions(-) diff --git a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java deleted file mode 100644 index dba57df..0000000 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ /dev/null @@ -1,842 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hadoop.hdds.scm.pipeline; - -import javax.management.ObjectName; -import java.io.IOException; -import java.time.Duration; -import java.time.Instant; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.List; -import java.util.Map; -import java.util.NavigableSet; -import java.util.Set; -import java.util.concurrent.TimeUnit; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.locks.ReadWriteLock; -import java.util.concurrent.locks.ReentrantReadWriteLock; - -import com.google.common.annotations.VisibleForTesting; -import org.apache.hadoop.hdds.HddsConfigKeys; -import org.apache.hadoop.hdds.client.RatisReplicationConfig; -import org.apache.hadoop.hdds.client.ReplicationConfig; -import org.apache.hadoop.hdds.conf.ConfigurationSource; -import org.apache.hadoop.hdds.protocol.DatanodeDetails; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; -import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; -import org.apache.hadoop.hdds.scm.ScmConfigKeys; -import org.apache.hadoop.hdds.scm.container.ContainerID; -import org.apache.hadoop.hdds.scm.events.SCMEvents; -import org.apache.hadoop.hdds.scm.exceptions.SCMException; -import org.apache.hadoop.hdds.scm.ha.SCMContext; -import org.apache.hadoop.hdds.scm.node.NodeManager; -import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; -import org.apache.hadoop.hdds.server.events.EventHandler; -import org.apache.hadoop.hdds.server.events.EventPublisher; -import org.apache.hadoop.hdds.utils.Scheduler; -import org.apache.hadoop.hdds.utils.db.Table; -import org.apache.hadoop.hdds.utils.db.Table.KeyValue; -import org.apache.hadoop.hdds.utils.db.TableIterator; -import org.apache.hadoop.metrics2.util.MBeans; -import org.apache.hadoop.util.Time; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE; - -/** - * Implements api needed for management of pipelines. All the write operations - * for pipelines must come via PipelineManager. It synchronises all write - * and read operations via a ReadWriteLock. - */ -public class SCMPipelineManager implements - PipelineManager, EventHandler<SafeModeStatus> { - - private static final Logger LOG = - LoggerFactory.getLogger(SCMPipelineManager.class); - - private final ReadWriteLock lock; - private PipelineFactory pipelineFactory; - private StateManager stateManager; - private final BackgroundPipelineCreator backgroundPipelineCreator; - private Scheduler scheduler; - - private final EventPublisher eventPublisher; - private final NodeManager nodeManager; - private final SCMPipelineMetrics metrics; - private final ConfigurationSource conf; - private long pipelineWaitDefaultTimeout; - // Pipeline Manager MXBean - private ObjectName pmInfoBean; - - private Table<PipelineID, Pipeline> pipelineStore; - - private final AtomicBoolean isInSafeMode; - // Used to track if the safemode pre-checks have completed. This is designed - // to prevent pipelines being created until sufficient nodes have registered. - private final AtomicBoolean pipelineCreationAllowed; - - // This allows for freezing/resuming the new pipeline creation while the - // SCM is already out of SafeMode. - private AtomicBoolean freezePipelineCreation; - - public SCMPipelineManager(ConfigurationSource conf, - NodeManager nodeManager, - Table<PipelineID, Pipeline> pipelineStore, - EventPublisher eventPublisher) - throws IOException { - this(conf, nodeManager, pipelineStore, eventPublisher, null, null); - this.stateManager = new PipelineStateManager(); - this.pipelineFactory = new PipelineFactory(nodeManager, - stateManager, conf, eventPublisher, SCMContext.emptyContext()); - this.pipelineStore = pipelineStore; - initializePipelineState(); - } - - protected SCMPipelineManager(ConfigurationSource conf, - NodeManager nodeManager, - Table<PipelineID, Pipeline> pipelineStore, - EventPublisher eventPublisher, - PipelineStateManager pipelineStateManager, - PipelineFactory pipelineFactory) - throws IOException { - this.lock = new ReentrantReadWriteLock(); - this.pipelineStore = pipelineStore; - this.conf = conf; - this.pipelineFactory = pipelineFactory; - this.stateManager = pipelineStateManager; - // TODO: See if thread priority needs to be set for these threads - scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); - this.backgroundPipelineCreator = - new BackgroundPipelineCreator(this, scheduler, conf); - this.eventPublisher = eventPublisher; - this.nodeManager = nodeManager; - this.metrics = SCMPipelineMetrics.create(); - this.pmInfoBean = MBeans.register("SCMPipelineManager", - "SCMPipelineManagerInfo", this); - this.pipelineWaitDefaultTimeout = conf.getTimeDuration( - HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, - HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, - TimeUnit.MILLISECONDS); - this.isInSafeMode = new AtomicBoolean(conf.getBoolean( - HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, - HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT)); - // Pipeline creation is only allowed after the safemode prechecks have - // passed, eg sufficient nodes have registered. - this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); - // controls freezing/resuming pipeline creation regardless of SafeMode - // status. - this.freezePipelineCreation = new AtomicBoolean(false); - } - - public StateManager getStateManager() { - return stateManager; - } - - @VisibleForTesting - public void setPipelineProvider(ReplicationType replicationType, - PipelineProvider provider) { - pipelineFactory.setProvider(replicationType, provider); - } - - @VisibleForTesting - public void allowPipelineCreation() { - this.pipelineCreationAllowed.set(true); - } - - @VisibleForTesting - public boolean isPipelineCreationAllowed() { - return pipelineCreationAllowed.get(); - } - - protected void initializePipelineState() throws IOException { - if (pipelineStore.isEmpty()) { - LOG.info("No pipeline exists in current db"); - return; - } - TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> - iterator = pipelineStore.iterator(); - while (iterator.hasNext()) { - Pipeline pipeline = nextPipelineFromIterator(iterator); - stateManager.addPipeline(pipeline); - nodeManager.addPipeline(pipeline); - } - } - - private Pipeline nextPipelineFromIterator( - TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it - ) throws IOException { - KeyValue<PipelineID, Pipeline> actual = it.next(); - Pipeline pipeline = actual.getValue(); - PipelineID pipelineID = actual.getKey(); - checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID); - return pipeline; - } - - /** - * This method is part of the change that happens in HDDS-3925, and we can - * and should remove this on later on. - * The purpose of the change is to get rid of protobuf serialization in the - * SCM database Pipeline table keys. The keys are not used anywhere, and the - * PipelineID that is used as a key is in the value as well, so we can detect - * a change in the key translation to byte[] and if we have the old format - * we refresh the table contents during SCM startup. - * - * If this fails in the remove, then there is an IOException coming from - * RocksDB itself, in this case in memory structures will still be fine and - * SCM should be operational, however we will attempt to replace the old key - * at next startup. In this case removing of the pipeline will leave the - * pipeline in RocksDB, and during next startup we will attempt to delete it - * again. This does not affect any runtime operations. - * If a Pipeline should have been deleted but remained in RocksDB, then at - * next startup it will be replaced and added with the new key, then SCM will - * detect that it is an invalid Pipeline and successfully delete it with the - * new key. - * For further info check the JIRA. - * - * @param it the iterator used to iterate the Pipeline table - * @param pipeline the pipeline read already from the iterator - * @param pipelineID the pipeline ID read from the raw data via the iterator - */ - private void checkKeyAndReplaceIfObsolete( - TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it, - Pipeline pipeline, - PipelineID pipelineID - ) { - if (!pipelineID.equals(pipeline.getId())) { - try { - LOG.info("Found pipeline in old format key : {}", pipeline.getId()); - it.removeFromDB(); - pipelineStore.put(pipeline.getId(), pipeline); - } catch (IOException e) { - LOG.info("Pipeline table in RocksDB has an old key format, and " - + "removing the pipeline with the old key was unsuccessful." - + "Pipeline: {}", pipeline); - } - } - } - - private void recordMetricsForPipeline(Pipeline pipeline) { - metrics.incNumPipelineAllocated(); - if (pipeline.isOpen()) { - metrics.incNumPipelineCreated(); - metrics.createPerPipelineMetrics(pipeline); - } - switch (pipeline.getType()) { - case STAND_ALONE: - return; - case RATIS: - List<Pipeline> overlapPipelines = RatisPipelineUtils - .checkPipelineContainSameDatanodes(stateManager, pipeline); - if (!overlapPipelines.isEmpty()) { - // Count 1 overlap at a time. - metrics.incNumPipelineContainSameDatanodes(); - //TODO remove until pipeline allocation is proved equally distributed. - for (Pipeline overlapPipeline : overlapPipelines) { - LOG.info("Pipeline: " + pipeline.getId().toString() + - " contains same datanodes as previous pipelines: " + - overlapPipeline.getId().toString() + " nodeIds: " + - pipeline.getNodes().get(0).getUuid().toString() + - ", " + pipeline.getNodes().get(1).getUuid().toString() + - ", " + pipeline.getNodes().get(2).getUuid().toString()); - } - } - return; - case CHAINED: - // Not supported. - default: - // Not supported. - return; - } - } - - @Override - public Pipeline createPipeline(ReplicationConfig replicationConfig) - throws IOException { - return createPipeline(replicationConfig, Collections.emptyList(), - Collections.emptyList()); - } - - @Override - public Pipeline createPipeline(ReplicationConfig replicationConfig, - List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes) - throws IOException { - if (!isPipelineCreationAllowed() - && replicationConfig.getRequiredNodes() != 1) { - LOG.debug("Pipeline creation is not allowed until safe mode prechecks " + - "complete"); - throw new IOException("Pipeline creation is not allowed as safe mode " + - "prechecks have not yet passed"); - } - if (freezePipelineCreation.get()) { - LOG.debug("Pipeline creation is frozen while an upgrade is in " + - "progress"); - throw new IOException("Pipeline creation is frozen while an upgrade " + - "is in progress"); - } - lock.writeLock().lock(); - try { - Pipeline pipeline = pipelineFactory.create(replicationConfig, - excludedNodes, favoredNodes); - if (pipelineStore != null) { - pipelineStore.put(pipeline.getId(), pipeline); - } - stateManager.addPipeline(pipeline); - nodeManager.addPipeline(pipeline); - recordMetricsForPipeline(pipeline); - return pipeline; - } catch (IOException ex) { - if (ex instanceof SCMException && - ((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) { - // Avoid spam SCM log with errors when SCM has enough open pipelines - LOG.debug("Can't create more pipelines of replicationConfig: {}. " + - "Reason: {}", replicationConfig, ex.getMessage()); - } else { - LOG.error("Failed to create pipeline of replicationConfig: {}. " + - "Exception: {}", replicationConfig, ex.getMessage()); - } - metrics.incNumPipelineCreationFailed(); - throw ex; - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public Pipeline createPipeline(ReplicationConfig replicationConfig, - List<DatanodeDetails> nodes) { - // This will mostly be used to create dummy pipeline for SimplePipelines. - // We don't update the metrics for SimplePipelines. - lock.writeLock().lock(); - try { - return pipelineFactory.create(replicationConfig, nodes); - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public Pipeline getPipeline(PipelineID pipelineID) - throws PipelineNotFoundException { - lock.readLock().lock(); - try { - return stateManager.getPipeline(pipelineID); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public boolean containsPipeline(PipelineID pipelineID) { - lock.readLock().lock(); - try { - getPipeline(pipelineID); - return true; - } catch (PipelineNotFoundException e) { - return false; - } finally { - lock.readLock().unlock(); - } - } - - @Override - public List<Pipeline> getPipelines() { - lock.readLock().lock(); - try { - return stateManager.getPipelines(); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public List<Pipeline> getPipelines(ReplicationConfig replicationConfig) { - lock.readLock().lock(); - try { - return stateManager.getPipelines(replicationConfig); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public List<Pipeline> getPipelines(ReplicationConfig replicationConfig, - Pipeline.PipelineState state) { - lock.readLock().lock(); - try { - return stateManager.getPipelines(replicationConfig, state); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public List<Pipeline> getPipelines(ReplicationConfig replicationConfig, - Pipeline.PipelineState state, - Collection<DatanodeDetails> excludeDns, - Collection<PipelineID> excludePipelines) { - lock.readLock().lock(); - try { - return stateManager - .getPipelines(replicationConfig, state, excludeDns, excludePipelines); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public int getPipelineCount(ReplicationConfig replicationConfig, - Pipeline.PipelineState state) { - lock.readLock().lock(); - try { - return stateManager.getPipelineCount(replicationConfig, state); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public void addContainerToPipeline(PipelineID pipelineID, - ContainerID containerID) throws IOException { - lock.writeLock().lock(); - try { - stateManager.addContainerToPipeline(pipelineID, containerID); - } finally { - lock.writeLock().unlock(); - } - } - - private void updatePipelineStateInDb(PipelineID pipelineId, - Pipeline.PipelineState oldState) - throws IOException { - // null check is here to prevent the case where SCM store - // is closed but the staleNode handlers/pipleine creations - // still try to access it. - if (pipelineStore != null) { - try { - pipelineStore.put(pipelineId, getPipeline(pipelineId)); - } catch (IOException ex) { - LOG.warn("Pipeline {} state update failed", pipelineId); - // revert back to old state in memory - stateManager.updatePipelineState(pipelineId, oldState); - } - } - } - - @Override - public void removeContainerFromPipeline(PipelineID pipelineID, - ContainerID containerID) throws IOException { - lock.writeLock().lock(); - try { - stateManager.removeContainerFromPipeline(pipelineID, containerID); - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public NavigableSet<ContainerID> getContainersInPipeline( - PipelineID pipelineID) throws IOException { - lock.readLock().lock(); - try { - return stateManager.getContainers(pipelineID); - } finally { - lock.readLock().unlock(); - } - } - - @Override - public int getNumberOfContainers(PipelineID pipelineID) throws IOException { - return stateManager.getNumberOfContainers(pipelineID); - } - - @Override - public void openPipeline(PipelineID pipelineId) throws IOException { - lock.writeLock().lock(); - try { - Pipeline.PipelineState state = stateManager. - getPipeline(pipelineId).getPipelineState(); - Pipeline pipeline = stateManager.openPipeline(pipelineId); - updatePipelineStateInDb(pipelineId, state); - metrics.incNumPipelineCreated(); - metrics.createPerPipelineMetrics(pipeline); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Fire events to close all containers related to the input pipeline. - * @param pipelineId - ID of the pipeline. - * @throws IOException - */ - protected void closeContainersForPipeline(final PipelineID pipelineId) - throws IOException { - Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId); - for (ContainerID containerID : containerIDs) { - eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); - } - } - - /** - * put pipeline in CLOSED state. - * @param pipeline - ID of the pipeline. - * @param onTimeout - whether to remove pipeline after some time. - * @throws IOException - */ - @Override - public void closePipeline(Pipeline pipeline, boolean onTimeout) - throws IOException { - PipelineID pipelineID = pipeline.getId(); - lock.writeLock().lock(); - try { - if (!pipeline.isClosed()) { - stateManager.updatePipelineState(pipelineID, - Pipeline.PipelineState.CLOSED); - LOG.info("Pipeline {} moved to CLOSED state", pipeline); - } - metrics.removePipelineMetrics(pipelineID); - } finally { - lock.writeLock().unlock(); - } - // close containers. - closeContainersForPipeline(pipelineID); - if (!onTimeout) { - // close pipeline right away. - removePipeline(pipeline); - } - } - - /** - * Scrub pipelines. - */ - @Override - public void scrubPipeline(ReplicationConfig replicationConfig) - throws IOException { - if (!RatisReplicationConfig.hasFactor(replicationConfig, - ReplicationFactor.THREE)) { - // Only srub pipeline for RATIS THREE pipeline - return; - } - Instant currentTime = Instant.now(); - Long pipelineScrubTimeoutInMills = conf.getTimeDuration( - ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, - ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, - TimeUnit.MILLISECONDS); - - List<Pipeline> candidates = stateManager.getPipelines(replicationConfig); - - for (Pipeline p : candidates) { - // scrub pipelines who stay ALLOCATED for too long. - if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED && - (currentTime.toEpochMilli() - p.getCreationTimestamp() - .toEpochMilli() >= pipelineScrubTimeoutInMills)) { - LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + - " since it stays at ALLOCATED stage for " + - Duration.between(currentTime, p.getCreationTimestamp()) - .toMinutes() + " mins."); - closePipeline(p, false); - } - // scrub pipelines who stay CLOSED for too long. - if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) { - LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + - " since it is at CLOSED stage."); - closeContainersForPipeline(p.getId()); - removePipeline(p); - } - } - return; - } - - @Override - public Map<String, Integer> getPipelineInfo() { - final Map<String, Integer> pipelineInfo = new HashMap<>(); - for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { - pipelineInfo.put(state.toString(), 0); - } - stateManager.getPipelines().forEach(pipeline -> - pipelineInfo.computeIfPresent( - pipeline.getPipelineState().toString(), (k, v) -> v + 1)); - return pipelineInfo; - } - - /** - * Schedules a fixed interval job to create pipelines. - */ - @Override - public void startPipelineCreator() { - backgroundPipelineCreator.startFixedIntervalPipelineCreator(); - } - - /** - * Triggers pipeline creation after the specified time. - */ - @Override - public void triggerPipelineCreation() { - backgroundPipelineCreator.triggerPipelineCreation(); - } - - /** - * Activates a dormant pipeline. - * - * @param pipelineID ID of the pipeline to activate. - * @throws IOException in case of any Exception - */ - @Override - public void activatePipeline(PipelineID pipelineID) - throws IOException { - lock.writeLock().lock(); - try { - Pipeline.PipelineState state = stateManager. - getPipeline(pipelineID).getPipelineState(); - stateManager.activatePipeline(pipelineID); - updatePipelineStateInDb(pipelineID, state); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Deactivates an active pipeline. - * - * @param pipelineID ID of the pipeline to deactivate. - * @throws IOException in case of any Exception - */ - @Override - public void deactivatePipeline(PipelineID pipelineID) - throws IOException { - lock.writeLock().lock(); - try { - Pipeline.PipelineState state = stateManager. - getPipeline(pipelineID).getPipelineState(); - stateManager.deactivatePipeline(pipelineID); - updatePipelineStateInDb(pipelineID, state); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * Wait a pipeline to be OPEN. - * - * @param pipelineID ID of the pipeline to wait for. - * @param timeout wait timeout, millisecond, 0 to use default value - * @throws IOException in case of any Exception, such as timeout - */ - @Override - public void waitPipelineReady(PipelineID pipelineID, long timeout) - throws IOException { - long st = Time.monotonicNow(); - if (timeout == 0) { - timeout = pipelineWaitDefaultTimeout; - } - - boolean ready; - Pipeline pipeline; - do { - try { - pipeline = stateManager.getPipeline(pipelineID); - } catch (PipelineNotFoundException e) { - throw new PipelineNotFoundException(String.format( - "Pipeline %s cannot be found", pipelineID)); - } - ready = pipeline.isOpen(); - if (!ready) { - try { - Thread.sleep((long)100); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - } - } - } while (!ready && Time.monotonicNow() - st < timeout); - - if (!ready) { - throw new IOException(String.format("Pipeline %s is not ready in %d ms", - pipelineID, timeout)); - } - } - - /** - * Removes the pipeline from the db and pipeline state map. - * - * @param pipeline - pipeline to be removed - * @throws IOException - */ - protected void removePipeline(Pipeline pipeline) throws IOException { - pipelineFactory.close(pipeline.getType(), pipeline); - PipelineID pipelineID = pipeline.getId(); - lock.writeLock().lock(); - try { - if (pipelineStore != null) { - pipelineStore.delete(pipelineID); - Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID); - nodeManager.removePipeline(pipelineRemoved); - metrics.incNumPipelineDestroyed(); - } - } catch (IOException ex) { - metrics.incNumPipelineDestroyFailed(); - throw ex; - } finally { - lock.writeLock().unlock(); - } - } - - @Override - public void incNumBlocksAllocatedMetric(PipelineID id) { - metrics.incNumBlocksAllocated(id); - } - - @Override - public void close() throws IOException { - if (scheduler != null) { - scheduler.close(); - scheduler = null; - } - - if(pmInfoBean != null) { - MBeans.unregister(this.pmInfoBean); - pmInfoBean = null; - } - - SCMPipelineMetrics.unRegister(); - - // shutdown pipeline provider. - pipelineFactory.shutdown(); - lock.writeLock().lock(); - try { - if (pipelineStore != null) { - pipelineStore.close(); - pipelineStore = null; - } - } catch (Exception ex) { - LOG.error("Pipeline store close failed", ex); - } finally { - lock.writeLock().unlock(); - } - } - - /** - * returns min number of healthy volumes from the set of - * datanodes constituting the pipeline. - * @param pipeline - * @return healthy volume count - */ - @Override - public int minHealthyVolumeNum(Pipeline pipeline) { - return nodeManager.minHealthyVolumeNum(pipeline.getNodes()); - } - - /** - * returns max count of raft log volumes from the set of - * datanodes constituting the pipeline. - * @param pipeline - * @return healthy volume count - */ - @Override - public int minPipelineLimit(Pipeline pipeline) { - return nodeManager.minPipelineLimit(pipeline.getNodes()); - } - - protected ReadWriteLock getLock() { - return lock; - } - - @VisibleForTesting - public PipelineFactory getPipelineFactory() { - return pipelineFactory; - } - - protected NodeManager getNodeManager() { - return nodeManager; - } - - @Override - public boolean getSafeModeStatus() { - return this.isInSafeMode.get(); - } - - @Override - public void reinitialize(Table<PipelineID, Pipeline> store) - throws IOException { - throw new RuntimeException("Not supported operation."); - } - - @Override - public void freezePipelineCreation() { - freezePipelineCreation.set(true); - backgroundPipelineCreator.pause(); - } - - @Override - public void resumePipelineCreation() { - freezePipelineCreation.set(false); - backgroundPipelineCreator.resume(); - } - - @Override - public void acquireReadLock() { - - } - - @Override - public void releaseReadLock() { - - } - - @Override - public void acquireWriteLock() { - - } - - @Override - public void releaseWriteLock() { - - } - - public Table<PipelineID, Pipeline> getPipelineStore() { - return pipelineStore; - } - - @Override - public void onMessage(SafeModeStatus status, - EventPublisher publisher) { - // TODO: #CLUTIL - handle safemode getting re-enabled - boolean currentAllowPipelines = - pipelineCreationAllowed.getAndSet(status.isPreCheckComplete()); - boolean currentlyInSafeMode = - isInSafeMode.getAndSet(status.isInSafeMode()); - - // Trigger pipeline creation only if the preCheck status has changed to - // complete. - if (isPipelineCreationAllowed() && !currentAllowPipelines) { - triggerPipelineCreation(); - } - // Start the pipeline creation thread only when safemode switches off - if (!getSafeModeStatus() && currentlyInSafeMode) { - startPipelineCreator(); - } - } - - @VisibleForTesting - protected static Logger getLog() { - return LOG; - } -} \ No newline at end of file --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
