This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit e08d5009ab99f8c0e315823753e1699fe64f1c7b
Author: S O'Donnell <[email protected]>
AuthorDate: Fri Sep 10 15:34:01 2021 +0100

    Removed SCMPipelineManager.java which should have been removed via 
HDDS-5554, but
    was somehow missing on an earlier master to ec branch merge.
---
 .../hdds/scm/pipeline/SCMPipelineManager.java      | 842 ---------------------
 1 file changed, 842 deletions(-)

diff --git 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
deleted file mode 100644
index dba57df..0000000
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ /dev/null
@@ -1,842 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hadoop.hdds.scm.pipeline;
-
-import javax.management.ObjectName;
-import java.io.IOException;
-import java.time.Duration;
-import java.time.Instant;
-import java.util.Collection;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.NavigableSet;
-import java.util.Set;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import com.google.common.annotations.VisibleForTesting;
-import org.apache.hadoop.hdds.HddsConfigKeys;
-import org.apache.hadoop.hdds.client.RatisReplicationConfig;
-import org.apache.hadoop.hdds.client.ReplicationConfig;
-import org.apache.hadoop.hdds.conf.ConfigurationSource;
-import org.apache.hadoop.hdds.protocol.DatanodeDetails;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
-import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
-import org.apache.hadoop.hdds.scm.ScmConfigKeys;
-import org.apache.hadoop.hdds.scm.container.ContainerID;
-import org.apache.hadoop.hdds.scm.events.SCMEvents;
-import org.apache.hadoop.hdds.scm.exceptions.SCMException;
-import org.apache.hadoop.hdds.scm.ha.SCMContext;
-import org.apache.hadoop.hdds.scm.node.NodeManager;
-import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
-import org.apache.hadoop.hdds.server.events.EventHandler;
-import org.apache.hadoop.hdds.server.events.EventPublisher;
-import org.apache.hadoop.hdds.utils.Scheduler;
-import org.apache.hadoop.hdds.utils.db.Table;
-import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
-import org.apache.hadoop.hdds.utils.db.TableIterator;
-import org.apache.hadoop.metrics2.util.MBeans;
-import org.apache.hadoop.util.Time;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
-
-/**
- * Implements api needed for management of pipelines. All the write operations
- * for pipelines must come via PipelineManager. It synchronises all write
- * and read operations via a ReadWriteLock.
- */
-public class SCMPipelineManager implements
-    PipelineManager, EventHandler<SafeModeStatus> {
-
-  private static final Logger LOG =
-      LoggerFactory.getLogger(SCMPipelineManager.class);
-
-  private final ReadWriteLock lock;
-  private PipelineFactory pipelineFactory;
-  private StateManager stateManager;
-  private final BackgroundPipelineCreator backgroundPipelineCreator;
-  private Scheduler scheduler;
-
-  private final EventPublisher eventPublisher;
-  private final NodeManager nodeManager;
-  private final SCMPipelineMetrics metrics;
-  private final ConfigurationSource conf;
-  private long pipelineWaitDefaultTimeout;
-  // Pipeline Manager MXBean
-  private ObjectName pmInfoBean;
-
-  private Table<PipelineID, Pipeline> pipelineStore;
-
-  private final AtomicBoolean isInSafeMode;
-  // Used to track if the safemode pre-checks have completed. This is designed
-  // to prevent pipelines being created until sufficient nodes have registered.
-  private final AtomicBoolean pipelineCreationAllowed;
-
-  // This allows for freezing/resuming the new pipeline creation while the
-  // SCM is already out of SafeMode.
-  private AtomicBoolean freezePipelineCreation;
-
-  public SCMPipelineManager(ConfigurationSource conf,
-      NodeManager nodeManager,
-      Table<PipelineID, Pipeline> pipelineStore,
-      EventPublisher eventPublisher)
-      throws IOException {
-    this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
-    this.stateManager = new PipelineStateManager();
-    this.pipelineFactory = new PipelineFactory(nodeManager,
-        stateManager, conf, eventPublisher, SCMContext.emptyContext());
-    this.pipelineStore = pipelineStore;
-    initializePipelineState();
-  }
-
-  protected SCMPipelineManager(ConfigurationSource conf,
-      NodeManager nodeManager,
-      Table<PipelineID, Pipeline> pipelineStore,
-      EventPublisher eventPublisher,
-      PipelineStateManager pipelineStateManager,
-      PipelineFactory pipelineFactory)
-      throws IOException {
-    this.lock = new ReentrantReadWriteLock();
-    this.pipelineStore = pipelineStore;
-    this.conf = conf;
-    this.pipelineFactory = pipelineFactory;
-    this.stateManager = pipelineStateManager;
-    // TODO: See if thread priority needs to be set for these threads
-    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
-    this.backgroundPipelineCreator =
-        new BackgroundPipelineCreator(this, scheduler, conf);
-    this.eventPublisher = eventPublisher;
-    this.nodeManager = nodeManager;
-    this.metrics = SCMPipelineMetrics.create();
-    this.pmInfoBean = MBeans.register("SCMPipelineManager",
-        "SCMPipelineManagerInfo", this);
-    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
-        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
-        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
-        TimeUnit.MILLISECONDS);
-    this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
-        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
-        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
-    // Pipeline creation is only allowed after the safemode prechecks have
-    // passed, eg sufficient nodes have registered.
-    this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get());
-    // controls freezing/resuming pipeline creation regardless of SafeMode
-    // status.
-    this.freezePipelineCreation = new AtomicBoolean(false);
-  }
-
-  public StateManager getStateManager() {
-    return stateManager;
-  }
-
-  @VisibleForTesting
-  public void setPipelineProvider(ReplicationType replicationType,
-                                  PipelineProvider provider) {
-    pipelineFactory.setProvider(replicationType, provider);
-  }
-
-  @VisibleForTesting
-  public void allowPipelineCreation() {
-    this.pipelineCreationAllowed.set(true);
-  }
-
-  @VisibleForTesting
-  public boolean isPipelineCreationAllowed() {
-    return pipelineCreationAllowed.get();
-  }
-
-  protected void initializePipelineState() throws IOException {
-    if (pipelineStore.isEmpty()) {
-      LOG.info("No pipeline exists in current db");
-      return;
-    }
-    TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
-        iterator = pipelineStore.iterator();
-    while (iterator.hasNext()) {
-      Pipeline pipeline = nextPipelineFromIterator(iterator);
-      stateManager.addPipeline(pipeline);
-      nodeManager.addPipeline(pipeline);
-    }
-  }
-
-  private Pipeline nextPipelineFromIterator(
-      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it
-  ) throws IOException {
-    KeyValue<PipelineID, Pipeline> actual = it.next();
-    Pipeline pipeline = actual.getValue();
-    PipelineID pipelineID = actual.getKey();
-    checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
-    return pipeline;
-  }
-
-  /**
-   * This method is part of the change that happens in HDDS-3925, and we can
-   * and should remove this on later on.
-   * The purpose of the change is to get rid of protobuf serialization in the
-   * SCM database Pipeline table keys. The keys are not used anywhere, and the
-   * PipelineID that is used as a key is in the value as well, so we can detect
-   * a change in the key translation to byte[] and if we have the old format
-   * we refresh the table contents during SCM startup.
-   *
-   * If this fails in the remove, then there is an IOException coming from
-   * RocksDB itself, in this case in memory structures will still be fine and
-   * SCM should be operational, however we will attempt to replace the old key
-   * at next startup. In this case removing of the pipeline will leave the
-   * pipeline in RocksDB, and during next startup we will attempt to delete it
-   * again. This does not affect any runtime operations.
-   * If a Pipeline should have been deleted but remained in RocksDB, then at
-   * next startup it will be replaced and added with the new key, then SCM will
-   * detect that it is an invalid Pipeline and successfully delete it with the
-   * new key.
-   * For further info check the JIRA.
-   *
-   * @param it the iterator used to iterate the Pipeline table
-   * @param pipeline the pipeline read already from the iterator
-   * @param pipelineID the pipeline ID read from the raw data via the iterator
-   */
-  private void checkKeyAndReplaceIfObsolete(
-      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it,
-      Pipeline pipeline,
-      PipelineID pipelineID
-  ) {
-    if (!pipelineID.equals(pipeline.getId())) {
-      try {
-        LOG.info("Found pipeline in old format key : {}", pipeline.getId());
-        it.removeFromDB();
-        pipelineStore.put(pipeline.getId(), pipeline);
-      } catch (IOException e) {
-        LOG.info("Pipeline table in RocksDB has an old key format, and "
-            + "removing the pipeline with the old key was unsuccessful."
-            + "Pipeline: {}", pipeline);
-      }
-    }
-  }
-
-  private void recordMetricsForPipeline(Pipeline pipeline) {
-    metrics.incNumPipelineAllocated();
-    if (pipeline.isOpen()) {
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
-    }
-    switch (pipeline.getType()) {
-    case STAND_ALONE:
-      return;
-    case RATIS:
-      List<Pipeline> overlapPipelines = RatisPipelineUtils
-          .checkPipelineContainSameDatanodes(stateManager, pipeline);
-      if (!overlapPipelines.isEmpty()) {
-        // Count 1 overlap at a time.
-        metrics.incNumPipelineContainSameDatanodes();
-        //TODO remove until pipeline allocation is proved equally distributed.
-        for (Pipeline overlapPipeline : overlapPipelines) {
-          LOG.info("Pipeline: " + pipeline.getId().toString() +
-              " contains same datanodes as previous pipelines: " +
-              overlapPipeline.getId().toString() + " nodeIds: " +
-              pipeline.getNodes().get(0).getUuid().toString() +
-              ", " + pipeline.getNodes().get(1).getUuid().toString() +
-              ", " + pipeline.getNodes().get(2).getUuid().toString());
-        }
-      }
-      return;
-    case CHAINED:
-      // Not supported.
-    default:
-      // Not supported.
-      return;
-    }
-  }
-
-  @Override
-  public Pipeline createPipeline(ReplicationConfig replicationConfig)
-      throws IOException {
-    return createPipeline(replicationConfig, Collections.emptyList(),
-        Collections.emptyList());
-  }
-
-  @Override
-  public Pipeline createPipeline(ReplicationConfig replicationConfig,
-      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
-      throws IOException {
-    if (!isPipelineCreationAllowed()
-        && replicationConfig.getRequiredNodes() != 1) {
-      LOG.debug("Pipeline creation is not allowed until safe mode prechecks " +
-          "complete");
-      throw new IOException("Pipeline creation is not allowed as safe mode " +
-          "prechecks have not yet passed");
-    }
-    if (freezePipelineCreation.get()) {
-      LOG.debug("Pipeline creation is frozen while an upgrade is in " +
-          "progress");
-      throw new IOException("Pipeline creation is frozen while an upgrade " +
-          "is in progress");
-    }
-    lock.writeLock().lock();
-    try {
-      Pipeline pipeline = pipelineFactory.create(replicationConfig,
-          excludedNodes, favoredNodes);
-      if (pipelineStore != null) {
-        pipelineStore.put(pipeline.getId(), pipeline);
-      }
-      stateManager.addPipeline(pipeline);
-      nodeManager.addPipeline(pipeline);
-      recordMetricsForPipeline(pipeline);
-      return pipeline;
-    } catch (IOException ex) {
-      if (ex instanceof SCMException &&
-          ((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) {
-        // Avoid spam SCM log with errors when SCM has enough open pipelines
-        LOG.debug("Can't create more pipelines of replicationConfig: {}. " +
-            "Reason: {}", replicationConfig, ex.getMessage());
-      } else {
-        LOG.error("Failed to create pipeline of replicationConfig: {}. " +
-            "Exception: {}", replicationConfig, ex.getMessage());
-      }
-      metrics.incNumPipelineCreationFailed();
-      throw ex;
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public Pipeline createPipeline(ReplicationConfig replicationConfig,
-      List<DatanodeDetails> nodes) {
-    // This will mostly be used to create dummy pipeline for SimplePipelines.
-    // We don't update the metrics for SimplePipelines.
-    lock.writeLock().lock();
-    try {
-      return pipelineFactory.create(replicationConfig, nodes);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public Pipeline getPipeline(PipelineID pipelineID)
-      throws PipelineNotFoundException {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipeline(pipelineID);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public boolean containsPipeline(PipelineID pipelineID) {
-    lock.readLock().lock();
-    try {
-      getPipeline(pipelineID);
-      return true;
-    } catch (PipelineNotFoundException e) {
-      return false;
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public List<Pipeline> getPipelines() {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines();
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(replicationConfig);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
-      Pipeline.PipelineState state) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelines(replicationConfig, state);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
-      Pipeline.PipelineState state,
-      Collection<DatanodeDetails> excludeDns,
-      Collection<PipelineID> excludePipelines) {
-    lock.readLock().lock();
-    try {
-      return stateManager
-          .getPipelines(replicationConfig, state, excludeDns, 
excludePipelines);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public int getPipelineCount(ReplicationConfig replicationConfig,
-      Pipeline.PipelineState state) {
-    lock.readLock().lock();
-    try {
-      return stateManager.getPipelineCount(replicationConfig, state);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public void addContainerToPipeline(PipelineID pipelineID,
-      ContainerID containerID) throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.addContainerToPipeline(pipelineID, containerID);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  private void updatePipelineStateInDb(PipelineID pipelineId,
-                                       Pipeline.PipelineState oldState)
-          throws IOException {
-    // null check is here to prevent the case where SCM store
-    // is closed but the staleNode handlers/pipleine creations
-    // still try to access it.
-    if (pipelineStore != null) {
-      try {
-        pipelineStore.put(pipelineId, getPipeline(pipelineId));
-      } catch (IOException ex) {
-        LOG.warn("Pipeline {} state update failed", pipelineId);
-        // revert back to old state in memory
-        stateManager.updatePipelineState(pipelineId, oldState);
-      }
-    }
-  }
-
-  @Override
-  public void removeContainerFromPipeline(PipelineID pipelineID,
-      ContainerID containerID) throws IOException {
-    lock.writeLock().lock();
-    try {
-      stateManager.removeContainerFromPipeline(pipelineID, containerID);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public NavigableSet<ContainerID> getContainersInPipeline(
-      PipelineID pipelineID) throws IOException {
-    lock.readLock().lock();
-    try {
-      return stateManager.getContainers(pipelineID);
-    } finally {
-      lock.readLock().unlock();
-    }
-  }
-
-  @Override
-  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
-    return stateManager.getNumberOfContainers(pipelineID);
-  }
-
-  @Override
-  public void openPipeline(PipelineID pipelineId) throws IOException {
-    lock.writeLock().lock();
-    try {
-      Pipeline.PipelineState state = stateManager.
-              getPipeline(pipelineId).getPipelineState();
-      Pipeline pipeline = stateManager.openPipeline(pipelineId);
-      updatePipelineStateInDb(pipelineId, state);
-      metrics.incNumPipelineCreated();
-      metrics.createPerPipelineMetrics(pipeline);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Fire events to close all containers related to the input pipeline.
-   * @param pipelineId - ID of the pipeline.
-   * @throws IOException
-   */
-  protected void closeContainersForPipeline(final PipelineID pipelineId)
-      throws IOException {
-    Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
-    for (ContainerID containerID : containerIDs) {
-      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
-    }
-  }
-
-  /**
-   * put pipeline in CLOSED state.
-   * @param pipeline - ID of the pipeline.
-   * @param onTimeout - whether to remove pipeline after some time.
-   * @throws IOException
-   */
-  @Override
-  public void closePipeline(Pipeline pipeline, boolean onTimeout)
-      throws IOException {
-    PipelineID pipelineID = pipeline.getId();
-    lock.writeLock().lock();
-    try {
-      if (!pipeline.isClosed()) {
-        stateManager.updatePipelineState(pipelineID,
-            Pipeline.PipelineState.CLOSED);
-        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
-      }
-      metrics.removePipelineMetrics(pipelineID);
-    } finally {
-      lock.writeLock().unlock();
-    }
-    // close containers.
-    closeContainersForPipeline(pipelineID);
-    if (!onTimeout) {
-      // close pipeline right away.
-      removePipeline(pipeline);
-    }
-  }
-
-  /**
-   * Scrub pipelines.
-   */
-  @Override
-  public void scrubPipeline(ReplicationConfig replicationConfig)
-      throws IOException {
-    if (!RatisReplicationConfig.hasFactor(replicationConfig,
-        ReplicationFactor.THREE)) {
-      // Only srub pipeline for RATIS THREE pipeline
-      return;
-    }
-    Instant currentTime = Instant.now();
-    Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
-        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
-        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
-        TimeUnit.MILLISECONDS);
-
-    List<Pipeline> candidates = stateManager.getPipelines(replicationConfig);
-
-    for (Pipeline p : candidates) {
-      // scrub pipelines who stay ALLOCATED for too long.
-      if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
-          (currentTime.toEpochMilli() - p.getCreationTimestamp()
-              .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
-        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-            " since it stays at ALLOCATED stage for " +
-            Duration.between(currentTime, p.getCreationTimestamp())
-                .toMinutes() + " mins.");
-        closePipeline(p, false);
-      }
-      // scrub pipelines who stay CLOSED for too long.
-      if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
-        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
-            " since it is at CLOSED stage.");
-        closeContainersForPipeline(p.getId());
-        removePipeline(p);
-      }
-    }
-    return;
-  }
-
-  @Override
-  public Map<String, Integer> getPipelineInfo() {
-    final Map<String, Integer> pipelineInfo = new HashMap<>();
-    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
-      pipelineInfo.put(state.toString(), 0);
-    }
-    stateManager.getPipelines().forEach(pipeline ->
-        pipelineInfo.computeIfPresent(
-            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
-    return pipelineInfo;
-  }
-
-  /**
-   * Schedules a fixed interval job to create pipelines.
-   */
-  @Override
-  public void startPipelineCreator() {
-    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
-  }
-
-  /**
-   * Triggers pipeline creation after the specified time.
-   */
-  @Override
-  public void triggerPipelineCreation() {
-    backgroundPipelineCreator.triggerPipelineCreation();
-  }
-
-  /**
-   * Activates a dormant pipeline.
-   *
-   * @param pipelineID ID of the pipeline to activate.
-   * @throws IOException in case of any Exception
-   */
-  @Override
-  public void activatePipeline(PipelineID pipelineID)
-      throws IOException {
-    lock.writeLock().lock();
-    try {
-      Pipeline.PipelineState state = stateManager.
-              getPipeline(pipelineID).getPipelineState();
-      stateManager.activatePipeline(pipelineID);
-      updatePipelineStateInDb(pipelineID, state);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Deactivates an active pipeline.
-   *
-   * @param pipelineID ID of the pipeline to deactivate.
-   * @throws IOException in case of any Exception
-   */
-  @Override
-  public void deactivatePipeline(PipelineID pipelineID)
-      throws IOException {
-    lock.writeLock().lock();
-    try {
-      Pipeline.PipelineState state = stateManager.
-              getPipeline(pipelineID).getPipelineState();
-      stateManager.deactivatePipeline(pipelineID);
-      updatePipelineStateInDb(pipelineID, state);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * Wait a pipeline to be OPEN.
-   *
-   * @param pipelineID ID of the pipeline to wait for.
-   * @param timeout    wait timeout, millisecond, 0 to use default value
-   * @throws IOException in case of any Exception, such as timeout
-   */
-  @Override
-  public void waitPipelineReady(PipelineID pipelineID, long timeout)
-      throws IOException {
-    long st = Time.monotonicNow();
-    if (timeout == 0) {
-      timeout = pipelineWaitDefaultTimeout;
-    }
-
-    boolean ready;
-    Pipeline pipeline;
-    do {
-      try {
-        pipeline = stateManager.getPipeline(pipelineID);
-      } catch (PipelineNotFoundException e) {
-        throw new PipelineNotFoundException(String.format(
-            "Pipeline %s cannot be found", pipelineID));
-      }
-      ready = pipeline.isOpen();
-      if (!ready) {
-        try {
-          Thread.sleep((long)100);
-        } catch (InterruptedException e) {
-          Thread.currentThread().interrupt();
-        }
-      }
-    } while (!ready && Time.monotonicNow() - st < timeout);
-
-    if (!ready) {
-      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
-          pipelineID, timeout));
-    }
-  }
-
-  /**
-   * Removes the pipeline from the db and pipeline state map.
-   *
-   * @param pipeline - pipeline to be removed
-   * @throws IOException
-   */
-  protected void removePipeline(Pipeline pipeline) throws IOException {
-    pipelineFactory.close(pipeline.getType(), pipeline);
-    PipelineID pipelineID = pipeline.getId();
-    lock.writeLock().lock();
-    try {
-      if (pipelineStore != null) {
-        pipelineStore.delete(pipelineID);
-        Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID);
-        nodeManager.removePipeline(pipelineRemoved);
-        metrics.incNumPipelineDestroyed();
-      }
-    } catch (IOException ex) {
-      metrics.incNumPipelineDestroyFailed();
-      throw ex;
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  @Override
-  public void incNumBlocksAllocatedMetric(PipelineID id) {
-    metrics.incNumBlocksAllocated(id);
-  }
-
-  @Override
-  public void close() throws IOException {
-    if (scheduler != null) {
-      scheduler.close();
-      scheduler = null;
-    }
-
-    if(pmInfoBean != null) {
-      MBeans.unregister(this.pmInfoBean);
-      pmInfoBean = null;
-    }
-
-    SCMPipelineMetrics.unRegister();
-
-    // shutdown pipeline provider.
-    pipelineFactory.shutdown();
-    lock.writeLock().lock();
-    try {
-      if (pipelineStore != null) {
-        pipelineStore.close();
-        pipelineStore = null;
-      }
-    } catch (Exception ex) {
-      LOG.error("Pipeline  store close failed", ex);
-    } finally {
-      lock.writeLock().unlock();
-    }
-  }
-
-  /**
-   * returns min number of healthy volumes from the set of
-   * datanodes constituting the pipeline.
-   * @param  pipeline
-   * @return healthy volume count
-   */
-  @Override
-  public int minHealthyVolumeNum(Pipeline pipeline) {
-    return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
-  }
-
-  /**
-   * returns max count of raft log volumes from the set of
-   * datanodes constituting the pipeline.
-   * @param  pipeline
-   * @return healthy volume count
-   */
-  @Override
-  public int minPipelineLimit(Pipeline pipeline) {
-    return nodeManager.minPipelineLimit(pipeline.getNodes());
-  }
-
-  protected ReadWriteLock getLock() {
-    return lock;
-  }
-
-  @VisibleForTesting
-  public PipelineFactory getPipelineFactory() {
-    return pipelineFactory;
-  }
-
-  protected NodeManager getNodeManager() {
-    return nodeManager;
-  }
-
-  @Override
-  public boolean getSafeModeStatus() {
-    return this.isInSafeMode.get();
-  }
-
-  @Override
-  public void reinitialize(Table<PipelineID, Pipeline> store)
-      throws IOException {
-    throw new RuntimeException("Not supported operation.");
-  }
-
-  @Override
-  public void freezePipelineCreation() {
-    freezePipelineCreation.set(true);
-    backgroundPipelineCreator.pause();
-  }
-
-  @Override
-  public void resumePipelineCreation() {
-    freezePipelineCreation.set(false);
-    backgroundPipelineCreator.resume();
-  }
-
-  @Override
-  public void acquireReadLock() {
-
-  }
-
-  @Override
-  public void releaseReadLock() {
-
-  }
-
-  @Override
-  public void acquireWriteLock() {
-
-  }
-
-  @Override
-  public void releaseWriteLock() {
-
-  }
-
-  public Table<PipelineID, Pipeline> getPipelineStore() {
-    return pipelineStore;
-  }
-
-  @Override
-  public void onMessage(SafeModeStatus status,
-                        EventPublisher publisher) {
-    // TODO: #CLUTIL - handle safemode getting re-enabled
-    boolean currentAllowPipelines =
-        pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
-    boolean currentlyInSafeMode =
-        isInSafeMode.getAndSet(status.isInSafeMode());
-
-    // Trigger pipeline creation only if the preCheck status has changed to
-    // complete.
-    if (isPipelineCreationAllowed() && !currentAllowPipelines) {
-      triggerPipelineCreation();
-    }
-    // Start the pipeline creation thread only when safemode switches off
-    if (!getSafeModeStatus() && currentlyInSafeMode) {
-      startPipelineCreator();
-    }
-  }
-
-  @VisibleForTesting
-  protected static Logger getLog() {
-    return LOG;
-  }
-}
\ No newline at end of file

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to