This is an automated email from the ASF dual-hosted git repository.

sodonnell pushed a commit to branch HDDS-3816-ec
in repository https://gitbox.apache.org/repos/asf/ozone.git

commit 2df6c9caf594b6fccfe965984b361e7bc194a01e
Merge: 90c4d92 e392c1a
Author: S O'Donnell <[email protected]>
AuthorDate: Fri Sep 10 15:20:53 2021 +0100

    Merge branch 'master' into ec branch

 .github/workflows/post-commit.yml                  |   2 +-
 .../org/apache/hadoop/hdds/HddsConfigKeys.java     |   5 +
 .../apache/hadoop/hdds/recon/ReconConfigKeys.java  |  10 +
 .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java  |  10 +-
 .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java  |   2 +
 .../hadoop/hdds/scm/net/NetworkTopologyImpl.java   |  24 +-
 .../java/org/apache/hadoop/ozone/OzoneConsts.java  |   2 -
 .../upgrade/AbstractLayoutVersionManager.java      | 158 +++--
 .../common/src/main/resources/ozone-default.xml    |  34 +-
 .../hdds/scm/net/TestNetworkTopologyImpl.java      |  33 +-
 .../apache/hadoop/ozone/HddsDatanodeService.java   |   5 +-
 .../container/common/helpers/ContainerUtils.java   |  38 +-
 .../ozone/container/common/impl/ContainerSet.java  |   3 +
 .../container/common/impl/HddsDispatcher.java      |   1 +
 .../common/interfaces/ContainerDispatcher.java     |   7 +-
 .../common/statemachine/DatanodeConfiguration.java |  21 +
 .../common/statemachine/DatanodeStateMachine.java  |   6 +-
 .../CloseContainerCommandHandler.java              |   2 +
 .../ClosePipelineCommandHandler.java               |  13 +-
 .../commandhandler/DeleteBlocksCommandHandler.java |   4 +-
 .../states/endpoint/VersionEndpointTask.java       |   5 +-
 .../server/ratis/ContainerStateMachine.java        |  11 +-
 .../container/common/utils/HddsVolumeUtil.java     |  72 +-
 .../ozone/container/common/volume/HddsVolume.java  |   2 +-
 .../container/common/volume/MutableVolumeSet.java  |  13 -
 .../container/keyvalue/KeyValueContainer.java      |  12 +-
 .../container/keyvalue/KeyValueContainerCheck.java |   4 +-
 .../ozone/container/keyvalue/KeyValueHandler.java  |  52 +-
 .../container/keyvalue/helpers/ChunkUtils.java     |  18 +-
 .../keyvalue/helpers/KeyValueContainerUtil.java    |   9 +-
 .../keyvalue/impl/FilePerBlockStrategy.java        |   9 +-
 .../container/ozoneimpl/ContainerController.java   |  24 +-
 .../ozone/container/ozoneimpl/ContainerReader.java | 161 ++---
 .../ozone/container/ozoneimpl/OzoneContainer.java  |   5 +-
 .../replication/GrpcReplicationClient.java         |   4 +-
 .../container/replication/ReplicationServer.java   |   7 +-
 .../upgrade/DataNodeUpgradeFinalizer.java          |   5 +-
 .../upgrade/DatanodeMetadataFeatures.java          |  53 --
 .../ScmHAFinalizeUpgradeActionDatanode.java        | 129 ++++
 .../ScmHAFirstUpgradeLayoutChangeAction.java       |  45 --
 .../upgrade/VersionedDatanodeFeatures.java         | 145 ++++
 .../hadoop/ozone/container/common/ScmTestMock.java |   9 +
 .../ozone/container/common/TestContainerCache.java |  23 +-
 .../common/TestKeyValueContainerData.java          |   8 +-
 .../common/impl/TestContainerDataYaml.java         |  43 +-
 .../TestCloseContainerCommandHandler.java          |  33 +-
 .../upgrade/TestDatanodeUpgradeToScmHA.java        | 741 +++++++++++++++++++++
 hadoop-hdds/docs/content/concept/OzoneManager.md   |   2 +-
 .../docs/content/concept/OzoneManager.zh.md        |   2 +-
 hadoop-hdds/docs/content/feature/SCM-HA.md         |   4 +-
 hadoop-hdds/docs/content/interface/ReconApi.md     | 217 +++++-
 .../docs/themes/ozonedoc/layouts/index.html        |   2 +
 .../hadoop/hdds/server/http/HttpServer2.java       |  17 +-
 .../hadoop/hdds/server/http/TestHttpServer2.java   |  54 ++
 .../src/main/proto/DatanodeClientProtocol.proto    |   1 +
 .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java |   7 +-
 .../hdds/scm/container/ContainerManagerImpl.java   |  67 +-
 .../scm/container/balancer/ContainerBalancer.java  |   8 +-
 .../balancer/ContainerBalancerConfiguration.java   |  14 +-
 .../replication/ReplicationManagerMetrics.java     |  44 +-
 .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java   |   7 +-
 .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java     |   3 +-
 .../hdds/scm/node/DatanodeAdminMonitorImpl.java    |  18 +
 .../hadoop/hdds/scm/pipeline/PipelineManager.java  |  20 +
 .../hdds/scm/pipeline/PipelineManagerImpl.java     |  51 +-
 .../hdds/scm/pipeline/SCMPipelineManager.java      |  20 +
 .../pipeline/WritableRatisContainerProvider.java   |  99 ++-
 .../hdds/scm/server/StorageContainerManager.java   |  48 +-
 .../TestSCMContainerPlacementRackAware.java        |  27 +
 .../hdds/scm/pipeline/MockPipelineManager.java     |  20 +
 .../ozone/container/common/TestEndPoint.java       |   4 +-
 .../org/apache/hadoop/hdds/cli/OzoneAdmin.java     |   7 +
 .../apache/hadoop/ozone/client/rpc/RpcClient.java  |   8 +-
 .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java  |  36 +-
 hadoop-ozone/dist/src/main/compose/ozone/README.md |   6 +
 .../dist/src/main/compose/ozone/docker-config      |   2 +
 hadoop-ozone/dist/src/main/compose/ozone/run.sh    |   4 +
 .../compose/ozonesecure-ha/docker-compose.yaml     |   2 +
 .../src/main/compose/ozonesecure/docker-config     |   2 +-
 hadoop-ozone/dist/src/main/compose/test-all.sh     |   3 +-
 hadoop-ozone/dist/src/main/compose/testlib.sh      |  23 +-
 .../dist/src/main/compose/upgrade/README.md        |  35 +-
 .../compose/upgrade/delete-and-regenerate-data.sh  |  53 --
 .../dist/src/main/compose/upgrade/delete-data.sh   |  36 -
 hadoop-ozone/dist/src/main/compose/upgrade/test.sh |   9 +-
 .../manual-upgrade/0.5.0-1.1.0/callback.sh         |  40 --
 .../upgrades/manual-upgrade}/README.md             |   7 +-
 .../{1.0.0-1.1.0 => 1.1.0-1.2.0}/callback.sh       |  17 +-
 .../upgrade/upgrades/non-rolling-upgrade/driver.sh |  31 +-
 .../dist/src/main/compose/versions/0.5.0.sh        |  26 -
 .../dist/src/main/compose/versions/1.0.0.sh        |  26 -
 .../dist/src/main/compose/versions/1.1.0.sh        |  26 -
 hadoop-ozone/dist/src/main/compose/xcompat/test.sh |   4 -
 hadoop-ozone/dist/src/main/k8s/examples/testlib.sh |   4 +-
 .../dist/src/main/smoketest/ozonefs/setup.robot    |   2 +-
 hadoop-ozone/integration-test/pom.xml              |   5 +
 .../org/apache/hadoop/ozone/MiniOzoneCluster.java  |   7 +-
 .../apache/hadoop/ozone/MiniOzoneClusterImpl.java  |   2 +-
 .../hadoop/ozone/MiniOzoneClusterProvider.java     | 283 ++++++++
 .../hadoop/ozone/MiniOzoneHAClusterImpl.java       |  21 +-
 .../hadoop/ozone/TestOzoneConfigurationFields.java |  11 +-
 .../ozone/container/TestContainerReplication.java  |  47 --
 .../container/ozoneimpl/TestOzoneContainer.java    |   8 +-
 .../ozone/dn/TestDatanodeLayoutUpgradeTool.java    | 131 ----
 .../hadoop/ozone/freon/TestRandomKeyGenerator.java |   3 -
 .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java |  24 +-
 .../ozone/om/TestOzoneManagerHAWithData.java       |  32 +-
 .../hadoop/ozone/scm/TestXceiverClientManager.java |  18 +-
 .../scm/node/TestDecommissionAndMaintenance.java   | 124 ++--
 .../hadoop/ozone/shell/TestNSSummaryAdmin.java     |  72 ++
 .../ozone/om/request/file/OMFileCreateRequest.java |   3 +-
 .../request/file/OMFileCreateRequestWithFSO.java   |   5 +-
 .../ozone/om/request/key/OMKeyCreateRequest.java   |   7 +-
 .../om/request/key/OMKeyCreateRequestWithFSO.java  |   3 +-
 .../ozone/om/upgrade/OMLayoutVersionManager.java   |   3 +-
 .../om/request/key/TestOMKeyCreateRequest.java     |  70 +-
 .../request/key/TestOMKeyCreateRequestWithFSO.java |  13 +
 .../org/apache/hadoop/fs/ozone/OzoneFsShell.java   |   4 +-
 .../apache/hadoop/ozone/recon/ReconHttpServer.java |   6 +-
 .../hadoop/ozone/recon/ReconServerConfigKeys.java  |   4 -
 .../hadoop/ozone/recon/api/NSSummaryEndpoint.java  | 137 ++--
 .../hadoop/ozone/recon/api/types/DUResponse.java   |  14 +
 .../ozone/recon/api/types/ResponseStatus.java      |   1 +
 .../ozone/recon/scm/ReconPipelineManager.java      |  12 +-
 .../webapps/recon/ozone-recon-web/api/db.json      | 324 ++++++++-
 .../webapps/recon/ozone-recon-web/api/routes.json  |  22 +-
 .../src/components/navBar/navBar.tsx               |   5 +
 .../src/components/rightDrawer/rightDrawer.tsx     |  72 ++
 .../src/constants/breadcrumbs.constants.tsx        |   3 +-
 .../webapps/recon/ozone-recon-web/src/routes.tsx   |   5 +
 .../diskUsage/diskUsage.less}                      |  41 +-
 .../src/views/diskUsage/diskUsage.tsx              | 444 ++++++++++++
 .../src/views/overview/overview.tsx                |   3 +-
 .../ozone/admin/nssummary/DiskUsageSubCommand.java | 214 ++++++
 .../admin/nssummary/FileSizeDistSubCommand.java    | 129 ++++
 .../ozone/admin/nssummary/NSSummaryAdmin.java      | 137 ++++
 .../ozone/admin/nssummary/NSSummaryCLIUtils.java   | 174 +++++
 .../admin/nssummary/QuotaUsageSubCommand.java      | 117 ++++
 .../ozone/admin/nssummary/SummarySubCommand.java   | 115 ++++
 .../ozone/admin/nssummary/package-info.java}       |  14 +-
 .../admin/scm/GetScmRatisRolesSubcommand.java      |   7 +-
 .../apache/hadoop/ozone/debug/DatanodeLayout.java  | 110 ---
 .../ozone/shell/volume/CreateVolumeHandler.java    |   4 +-
 pom.xml                                            |   2 +-
 144 files changed, 4835 insertions(+), 1364 deletions(-)

diff --cc 
hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
index cbbe371,00e819f..521d563
--- 
a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
+++ 
b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java
@@@ -186,23 -187,27 +188,29 @@@ public final class ContainerUtils 
     * Verify that the checksum stored in containerData is equal to the
     * computed checksum.
     */
-   public static void verifyChecksum(ContainerData containerData)
-       throws IOException {
-     String storedChecksum = containerData.getChecksum();
- 
-     Yaml yaml = ContainerDataYaml.getYamlForContainerType(
-         containerData.getContainerType(),
-         containerData instanceof KeyValueContainerData &&
-             ((KeyValueContainerData)containerData).getReplicaIndex() > 0);
-     containerData.computeAndSetChecksum(yaml);
-     String computedChecksum = containerData.getChecksum();
- 
-     if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) {
-       throw new StorageContainerException("Container checksum error for " +
-           "ContainerID: " + containerData.getContainerID() + ". " +
-           "\nStored Checksum: " + storedChecksum +
-           "\nExpected Checksum: " + computedChecksum,
-           CONTAINER_CHECKSUM_ERROR);
+   public static void verifyChecksum(ContainerData containerData,
+       ConfigurationSource conf) throws IOException {
+     boolean enabled = conf.getBoolean(
+             HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED,
+             HddsConfigKeys.
+                     HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT);
+     if(enabled) {
+       String storedChecksum = containerData.getChecksum();
+ 
+       Yaml yaml = ContainerDataYaml.getYamlForContainerType(
 -              containerData.getContainerType());
++          containerData.getContainerType(),
++          containerData instanceof KeyValueContainerData &&
++              ((KeyValueContainerData)containerData).getReplicaIndex() > 0);
+       containerData.computeAndSetChecksum(yaml);
+       String computedChecksum = containerData.getChecksum();
+ 
+       if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) 
{
+         throw new StorageContainerException("Container checksum error for " +
+                 "ContainerID: " + containerData.getContainerID() + ". " +
+                 "\nStored Checksum: " + storedChecksum +
+                 "\nExpected Checksum: " + computedChecksum,
+                 CONTAINER_CHECKSUM_ERROR);
+       }
      }
    }
  
diff --cc 
hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
index a9642e7,4dc38e9..67a21da
--- 
a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
+++ 
b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java
@@@ -27,7 -28,7 +29,8 @@@ import org.apache.hadoop.ozone.OzoneCon
  import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils;
  import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo;
  import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData;
 +import org.junit.Assert;
+ import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures;
  import org.apache.ozone.test.GenericTestUtils;
  import org.junit.Test;
  import org.junit.runner.RunWith;
@@@ -90,8 -90,8 +94,9 @@@ public class TestContainerDataYaml 
      keyValueContainerData.setMetadataPath(testRoot);
      keyValueContainerData.setChunksPath(testRoot);
      keyValueContainerData.updateDataScanTime(SCAN_TIME);
-     keyValueContainerData.setSchemaVersion(OzoneConsts.SCHEMA_LATEST);
+     keyValueContainerData.setSchemaVersion(
+         VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion());
 +    keyValueContainerData.setReplicaIndex(replicaIndex);
  
      File containerFile = new File(testRoot, containerPath);
  
@@@ -135,9 -135,8 +140,9 @@@
                   kvData.lastDataScanTime().get().toEpochMilli());
      assertEquals(SCAN_TIME.toEpochMilli(),
                   kvData.getDataScanTimestamp().longValue());
-     assertEquals(OzoneConsts.SCHEMA_LATEST,
+     assertEquals(VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(),
 -            kvData.getSchemaVersion());
 +        kvData.getSchemaVersion());
 +    assertEquals(7, kvData.getReplicaIndex());
  
      // Update ContainerData.
      kvData.addMetadata(OzoneConsts.VOLUME, VOLUME_OWNER);
@@@ -249,24 -233,7 +254,24 @@@
    public void testChecksumInContainerFile() throws IOException {
      long containerID = testContainerID++;
  
 -    File containerFile = createContainerFile(containerID);
 +    File containerFile = createContainerFile(containerID, 0);
 +
 +    // Read from .container file, and verify data.
 +    KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
 +        .readContainerFile(containerFile);
-     ContainerUtils.verifyChecksum(kvData);
++    ContainerUtils.verifyChecksum(kvData, conf);
 +
 +    cleanup();
 +  }
 +
 +  /**
 +   * Test to verify {@link ContainerUtils#verifyChecksum(ContainerData)}.
 +   */
 +  @Test
 +  public void testChecksumInContainerFileWithReplicaIndex() throws 
IOException {
 +    long containerID = testContainerID++;
 +
 +    File containerFile = createContainerFile(containerID, 10);
  
      // Read from .container file, and verify data.
      KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
index dd9f514,c6651f4..4d76cb7
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java
@@@ -173,10 -163,9 +172,10 @@@ public class PipelineManagerImpl implem
        throw new IOException(message);
      }
  
-     lock.lock();
+     acquireWriteLock();
      try {
 -      Pipeline pipeline = pipelineFactory.create(replicationConfig);
 +      Pipeline pipeline = pipelineFactory.create(replicationConfig,
 +          excludedNodes, favoredNodes);
        stateManager.addPipeline(pipeline.getProtobufMessage(
            ClientVersions.CURRENT_VERSION));
        recordMetricsForPipeline(pipeline);
diff --cc 
hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
index 4ef5fcd,0000000..dba57df
mode 100644,000000..100644
--- 
a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
+++ 
b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java
@@@ -1,822 -1,0 +1,842 @@@
 +/**
 + * Licensed to the Apache Software Foundation (ASF) under one
 + * or more contributor license agreements.  See the NOTICE file
 + * distributed with this work for additional information
 + * regarding copyright ownership.  The ASF licenses this file
 + * to you under the Apache License, Version 2.0 (the
 + * "License"); you may not use this file except in compliance
 + * with the License.  You may obtain a copy of the License at
 + *
 + *     http://www.apache.org/licenses/LICENSE-2.0
 + *
 + * Unless required by applicable law or agreed to in writing, software
 + * distributed under the License is distributed on an "AS IS" BASIS,
 + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 + * See the License for the specific language governing permissions and
 + * limitations under the License.
 + */
 +
 +package org.apache.hadoop.hdds.scm.pipeline;
 +
 +import javax.management.ObjectName;
 +import java.io.IOException;
 +import java.time.Duration;
 +import java.time.Instant;
 +import java.util.Collection;
 +import java.util.Collections;
 +import java.util.HashMap;
 +import java.util.List;
 +import java.util.Map;
 +import java.util.NavigableSet;
 +import java.util.Set;
 +import java.util.concurrent.TimeUnit;
 +import java.util.concurrent.atomic.AtomicBoolean;
 +import java.util.concurrent.locks.ReadWriteLock;
 +import java.util.concurrent.locks.ReentrantReadWriteLock;
 +
 +import com.google.common.annotations.VisibleForTesting;
 +import org.apache.hadoop.hdds.HddsConfigKeys;
 +import org.apache.hadoop.hdds.client.RatisReplicationConfig;
 +import org.apache.hadoop.hdds.client.ReplicationConfig;
 +import org.apache.hadoop.hdds.conf.ConfigurationSource;
 +import org.apache.hadoop.hdds.protocol.DatanodeDetails;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor;
 +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType;
 +import org.apache.hadoop.hdds.scm.ScmConfigKeys;
 +import org.apache.hadoop.hdds.scm.container.ContainerID;
 +import org.apache.hadoop.hdds.scm.events.SCMEvents;
 +import org.apache.hadoop.hdds.scm.exceptions.SCMException;
 +import org.apache.hadoop.hdds.scm.ha.SCMContext;
 +import org.apache.hadoop.hdds.scm.node.NodeManager;
 +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus;
 +import org.apache.hadoop.hdds.server.events.EventHandler;
 +import org.apache.hadoop.hdds.server.events.EventPublisher;
 +import org.apache.hadoop.hdds.utils.Scheduler;
 +import org.apache.hadoop.hdds.utils.db.Table;
 +import org.apache.hadoop.hdds.utils.db.Table.KeyValue;
 +import org.apache.hadoop.hdds.utils.db.TableIterator;
 +import org.apache.hadoop.metrics2.util.MBeans;
 +import org.apache.hadoop.util.Time;
 +import org.slf4j.Logger;
 +import org.slf4j.LoggerFactory;
 +
 +import static 
org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE;
 +
 +/**
 + * Implements api needed for management of pipelines. All the write operations
 + * for pipelines must come via PipelineManager. It synchronises all write
 + * and read operations via a ReadWriteLock.
 + */
 +public class SCMPipelineManager implements
 +    PipelineManager, EventHandler<SafeModeStatus> {
 +
 +  private static final Logger LOG =
 +      LoggerFactory.getLogger(SCMPipelineManager.class);
 +
 +  private final ReadWriteLock lock;
 +  private PipelineFactory pipelineFactory;
 +  private StateManager stateManager;
 +  private final BackgroundPipelineCreator backgroundPipelineCreator;
 +  private Scheduler scheduler;
 +
 +  private final EventPublisher eventPublisher;
 +  private final NodeManager nodeManager;
 +  private final SCMPipelineMetrics metrics;
 +  private final ConfigurationSource conf;
 +  private long pipelineWaitDefaultTimeout;
 +  // Pipeline Manager MXBean
 +  private ObjectName pmInfoBean;
 +
 +  private Table<PipelineID, Pipeline> pipelineStore;
 +
 +  private final AtomicBoolean isInSafeMode;
 +  // Used to track if the safemode pre-checks have completed. This is designed
 +  // to prevent pipelines being created until sufficient nodes have 
registered.
 +  private final AtomicBoolean pipelineCreationAllowed;
 +
 +  // This allows for freezing/resuming the new pipeline creation while the
 +  // SCM is already out of SafeMode.
 +  private AtomicBoolean freezePipelineCreation;
 +
 +  public SCMPipelineManager(ConfigurationSource conf,
 +      NodeManager nodeManager,
 +      Table<PipelineID, Pipeline> pipelineStore,
 +      EventPublisher eventPublisher)
 +      throws IOException {
 +    this(conf, nodeManager, pipelineStore, eventPublisher, null, null);
 +    this.stateManager = new PipelineStateManager();
 +    this.pipelineFactory = new PipelineFactory(nodeManager,
 +        stateManager, conf, eventPublisher, SCMContext.emptyContext());
 +    this.pipelineStore = pipelineStore;
 +    initializePipelineState();
 +  }
 +
 +  protected SCMPipelineManager(ConfigurationSource conf,
 +      NodeManager nodeManager,
 +      Table<PipelineID, Pipeline> pipelineStore,
 +      EventPublisher eventPublisher,
 +      PipelineStateManager pipelineStateManager,
 +      PipelineFactory pipelineFactory)
 +      throws IOException {
 +    this.lock = new ReentrantReadWriteLock();
 +    this.pipelineStore = pipelineStore;
 +    this.conf = conf;
 +    this.pipelineFactory = pipelineFactory;
 +    this.stateManager = pipelineStateManager;
 +    // TODO: See if thread priority needs to be set for these threads
 +    scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1);
 +    this.backgroundPipelineCreator =
 +        new BackgroundPipelineCreator(this, scheduler, conf);
 +    this.eventPublisher = eventPublisher;
 +    this.nodeManager = nodeManager;
 +    this.metrics = SCMPipelineMetrics.create();
 +    this.pmInfoBean = MBeans.register("SCMPipelineManager",
 +        "SCMPipelineManagerInfo", this);
 +    this.pipelineWaitDefaultTimeout = conf.getTimeDuration(
 +        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL,
 +        HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT,
 +        TimeUnit.MILLISECONDS);
 +    this.isInSafeMode = new AtomicBoolean(conf.getBoolean(
 +        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED,
 +        HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT));
 +    // Pipeline creation is only allowed after the safemode prechecks have
 +    // passed, eg sufficient nodes have registered.
 +    this.pipelineCreationAllowed = new 
AtomicBoolean(!this.isInSafeMode.get());
 +    // controls freezing/resuming pipeline creation regardless of SafeMode
 +    // status.
 +    this.freezePipelineCreation = new AtomicBoolean(false);
 +  }
 +
 +  public StateManager getStateManager() {
 +    return stateManager;
 +  }
 +
 +  @VisibleForTesting
 +  public void setPipelineProvider(ReplicationType replicationType,
 +                                  PipelineProvider provider) {
 +    pipelineFactory.setProvider(replicationType, provider);
 +  }
 +
 +  @VisibleForTesting
 +  public void allowPipelineCreation() {
 +    this.pipelineCreationAllowed.set(true);
 +  }
 +
 +  @VisibleForTesting
 +  public boolean isPipelineCreationAllowed() {
 +    return pipelineCreationAllowed.get();
 +  }
 +
 +  protected void initializePipelineState() throws IOException {
 +    if (pipelineStore.isEmpty()) {
 +      LOG.info("No pipeline exists in current db");
 +      return;
 +    }
 +    TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>>
 +        iterator = pipelineStore.iterator();
 +    while (iterator.hasNext()) {
 +      Pipeline pipeline = nextPipelineFromIterator(iterator);
 +      stateManager.addPipeline(pipeline);
 +      nodeManager.addPipeline(pipeline);
 +    }
 +  }
 +
 +  private Pipeline nextPipelineFromIterator(
 +      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it
 +  ) throws IOException {
 +    KeyValue<PipelineID, Pipeline> actual = it.next();
 +    Pipeline pipeline = actual.getValue();
 +    PipelineID pipelineID = actual.getKey();
 +    checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID);
 +    return pipeline;
 +  }
 +
 +  /**
 +   * This method is part of the change that happens in HDDS-3925, and we can
 +   * and should remove this on later on.
 +   * The purpose of the change is to get rid of protobuf serialization in the
 +   * SCM database Pipeline table keys. The keys are not used anywhere, and the
 +   * PipelineID that is used as a key is in the value as well, so we can 
detect
 +   * a change in the key translation to byte[] and if we have the old format
 +   * we refresh the table contents during SCM startup.
 +   *
 +   * If this fails in the remove, then there is an IOException coming from
 +   * RocksDB itself, in this case in memory structures will still be fine and
 +   * SCM should be operational, however we will attempt to replace the old key
 +   * at next startup. In this case removing of the pipeline will leave the
 +   * pipeline in RocksDB, and during next startup we will attempt to delete it
 +   * again. This does not affect any runtime operations.
 +   * If a Pipeline should have been deleted but remained in RocksDB, then at
 +   * next startup it will be replaced and added with the new key, then SCM 
will
 +   * detect that it is an invalid Pipeline and successfully delete it with the
 +   * new key.
 +   * For further info check the JIRA.
 +   *
 +   * @param it the iterator used to iterate the Pipeline table
 +   * @param pipeline the pipeline read already from the iterator
 +   * @param pipelineID the pipeline ID read from the raw data via the iterator
 +   */
 +  private void checkKeyAndReplaceIfObsolete(
 +      TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it,
 +      Pipeline pipeline,
 +      PipelineID pipelineID
 +  ) {
 +    if (!pipelineID.equals(pipeline.getId())) {
 +      try {
 +        LOG.info("Found pipeline in old format key : {}", pipeline.getId());
 +        it.removeFromDB();
 +        pipelineStore.put(pipeline.getId(), pipeline);
 +      } catch (IOException e) {
 +        LOG.info("Pipeline table in RocksDB has an old key format, and "
 +            + "removing the pipeline with the old key was unsuccessful."
 +            + "Pipeline: {}", pipeline);
 +      }
 +    }
 +  }
 +
 +  private void recordMetricsForPipeline(Pipeline pipeline) {
 +    metrics.incNumPipelineAllocated();
 +    if (pipeline.isOpen()) {
 +      metrics.incNumPipelineCreated();
 +      metrics.createPerPipelineMetrics(pipeline);
 +    }
 +    switch (pipeline.getType()) {
 +    case STAND_ALONE:
 +      return;
 +    case RATIS:
 +      List<Pipeline> overlapPipelines = RatisPipelineUtils
 +          .checkPipelineContainSameDatanodes(stateManager, pipeline);
 +      if (!overlapPipelines.isEmpty()) {
 +        // Count 1 overlap at a time.
 +        metrics.incNumPipelineContainSameDatanodes();
 +        //TODO remove until pipeline allocation is proved equally distributed.
 +        for (Pipeline overlapPipeline : overlapPipelines) {
 +          LOG.info("Pipeline: " + pipeline.getId().toString() +
 +              " contains same datanodes as previous pipelines: " +
 +              overlapPipeline.getId().toString() + " nodeIds: " +
 +              pipeline.getNodes().get(0).getUuid().toString() +
 +              ", " + pipeline.getNodes().get(1).getUuid().toString() +
 +              ", " + pipeline.getNodes().get(2).getUuid().toString());
 +        }
 +      }
 +      return;
 +    case CHAINED:
 +      // Not supported.
 +    default:
 +      // Not supported.
 +      return;
 +    }
 +  }
 +
 +  @Override
 +  public Pipeline createPipeline(ReplicationConfig replicationConfig)
 +      throws IOException {
 +    return createPipeline(replicationConfig, Collections.emptyList(),
 +        Collections.emptyList());
 +  }
 +
 +  @Override
 +  public Pipeline createPipeline(ReplicationConfig replicationConfig,
 +      List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes)
 +      throws IOException {
 +    if (!isPipelineCreationAllowed()
 +        && replicationConfig.getRequiredNodes() != 1) {
 +      LOG.debug("Pipeline creation is not allowed until safe mode prechecks " 
+
 +          "complete");
 +      throw new IOException("Pipeline creation is not allowed as safe mode " +
 +          "prechecks have not yet passed");
 +    }
 +    if (freezePipelineCreation.get()) {
 +      LOG.debug("Pipeline creation is frozen while an upgrade is in " +
 +          "progress");
 +      throw new IOException("Pipeline creation is frozen while an upgrade " +
 +          "is in progress");
 +    }
 +    lock.writeLock().lock();
 +    try {
 +      Pipeline pipeline = pipelineFactory.create(replicationConfig,
 +          excludedNodes, favoredNodes);
 +      if (pipelineStore != null) {
 +        pipelineStore.put(pipeline.getId(), pipeline);
 +      }
 +      stateManager.addPipeline(pipeline);
 +      nodeManager.addPipeline(pipeline);
 +      recordMetricsForPipeline(pipeline);
 +      return pipeline;
 +    } catch (IOException ex) {
 +      if (ex instanceof SCMException &&
 +          ((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) {
 +        // Avoid spam SCM log with errors when SCM has enough open pipelines
 +        LOG.debug("Can't create more pipelines of replicationConfig: {}. " +
 +            "Reason: {}", replicationConfig, ex.getMessage());
 +      } else {
 +        LOG.error("Failed to create pipeline of replicationConfig: {}. " +
 +            "Exception: {}", replicationConfig, ex.getMessage());
 +      }
 +      metrics.incNumPipelineCreationFailed();
 +      throw ex;
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public Pipeline createPipeline(ReplicationConfig replicationConfig,
 +      List<DatanodeDetails> nodes) {
 +    // This will mostly be used to create dummy pipeline for SimplePipelines.
 +    // We don't update the metrics for SimplePipelines.
 +    lock.writeLock().lock();
 +    try {
 +      return pipelineFactory.create(replicationConfig, nodes);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public Pipeline getPipeline(PipelineID pipelineID)
 +      throws PipelineNotFoundException {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getPipeline(pipelineID);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public boolean containsPipeline(PipelineID pipelineID) {
 +    lock.readLock().lock();
 +    try {
 +      getPipeline(pipelineID);
 +      return true;
 +    } catch (PipelineNotFoundException e) {
 +      return false;
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public List<Pipeline> getPipelines() {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getPipelines();
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig) {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getPipelines(replicationConfig);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
 +      Pipeline.PipelineState state) {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getPipelines(replicationConfig, state);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public List<Pipeline> getPipelines(ReplicationConfig replicationConfig,
 +      Pipeline.PipelineState state,
 +      Collection<DatanodeDetails> excludeDns,
 +      Collection<PipelineID> excludePipelines) {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager
 +          .getPipelines(replicationConfig, state, excludeDns, 
excludePipelines);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public int getPipelineCount(ReplicationConfig replicationConfig,
 +      Pipeline.PipelineState state) {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getPipelineCount(replicationConfig, state);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public void addContainerToPipeline(PipelineID pipelineID,
 +      ContainerID containerID) throws IOException {
 +    lock.writeLock().lock();
 +    try {
 +      stateManager.addContainerToPipeline(pipelineID, containerID);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  private void updatePipelineStateInDb(PipelineID pipelineId,
 +                                       Pipeline.PipelineState oldState)
 +          throws IOException {
 +    // null check is here to prevent the case where SCM store
 +    // is closed but the staleNode handlers/pipleine creations
 +    // still try to access it.
 +    if (pipelineStore != null) {
 +      try {
 +        pipelineStore.put(pipelineId, getPipeline(pipelineId));
 +      } catch (IOException ex) {
 +        LOG.warn("Pipeline {} state update failed", pipelineId);
 +        // revert back to old state in memory
 +        stateManager.updatePipelineState(pipelineId, oldState);
 +      }
 +    }
 +  }
 +
 +  @Override
 +  public void removeContainerFromPipeline(PipelineID pipelineID,
 +      ContainerID containerID) throws IOException {
 +    lock.writeLock().lock();
 +    try {
 +      stateManager.removeContainerFromPipeline(pipelineID, containerID);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public NavigableSet<ContainerID> getContainersInPipeline(
 +      PipelineID pipelineID) throws IOException {
 +    lock.readLock().lock();
 +    try {
 +      return stateManager.getContainers(pipelineID);
 +    } finally {
 +      lock.readLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public int getNumberOfContainers(PipelineID pipelineID) throws IOException {
 +    return stateManager.getNumberOfContainers(pipelineID);
 +  }
 +
 +  @Override
 +  public void openPipeline(PipelineID pipelineId) throws IOException {
 +    lock.writeLock().lock();
 +    try {
 +      Pipeline.PipelineState state = stateManager.
 +              getPipeline(pipelineId).getPipelineState();
 +      Pipeline pipeline = stateManager.openPipeline(pipelineId);
 +      updatePipelineStateInDb(pipelineId, state);
 +      metrics.incNumPipelineCreated();
 +      metrics.createPerPipelineMetrics(pipeline);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Fire events to close all containers related to the input pipeline.
 +   * @param pipelineId - ID of the pipeline.
 +   * @throws IOException
 +   */
 +  protected void closeContainersForPipeline(final PipelineID pipelineId)
 +      throws IOException {
 +    Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId);
 +    for (ContainerID containerID : containerIDs) {
 +      eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID);
 +    }
 +  }
 +
 +  /**
 +   * put pipeline in CLOSED state.
 +   * @param pipeline - ID of the pipeline.
 +   * @param onTimeout - whether to remove pipeline after some time.
 +   * @throws IOException
 +   */
 +  @Override
 +  public void closePipeline(Pipeline pipeline, boolean onTimeout)
 +      throws IOException {
 +    PipelineID pipelineID = pipeline.getId();
 +    lock.writeLock().lock();
 +    try {
 +      if (!pipeline.isClosed()) {
 +        stateManager.updatePipelineState(pipelineID,
 +            Pipeline.PipelineState.CLOSED);
 +        LOG.info("Pipeline {} moved to CLOSED state", pipeline);
 +      }
 +      metrics.removePipelineMetrics(pipelineID);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +    // close containers.
 +    closeContainersForPipeline(pipelineID);
 +    if (!onTimeout) {
 +      // close pipeline right away.
 +      removePipeline(pipeline);
 +    }
 +  }
 +
 +  /**
 +   * Scrub pipelines.
 +   */
 +  @Override
 +  public void scrubPipeline(ReplicationConfig replicationConfig)
 +      throws IOException {
 +    if (!RatisReplicationConfig.hasFactor(replicationConfig,
 +        ReplicationFactor.THREE)) {
 +      // Only srub pipeline for RATIS THREE pipeline
 +      return;
 +    }
 +    Instant currentTime = Instant.now();
 +    Long pipelineScrubTimeoutInMills = conf.getTimeDuration(
 +        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT,
 +        ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT,
 +        TimeUnit.MILLISECONDS);
 +
 +    List<Pipeline> candidates = stateManager.getPipelines(replicationConfig);
 +
 +    for (Pipeline p : candidates) {
 +      // scrub pipelines who stay ALLOCATED for too long.
 +      if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED &&
 +          (currentTime.toEpochMilli() - p.getCreationTimestamp()
 +              .toEpochMilli() >= pipelineScrubTimeoutInMills)) {
 +        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
 +            " since it stays at ALLOCATED stage for " +
 +            Duration.between(currentTime, p.getCreationTimestamp())
 +                .toMinutes() + " mins.");
 +        closePipeline(p, false);
 +      }
 +      // scrub pipelines who stay CLOSED for too long.
 +      if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) {
 +        LOG.info("Scrubbing pipeline: id: " + p.getId().toString() +
 +            " since it is at CLOSED stage.");
 +        closeContainersForPipeline(p.getId());
 +        removePipeline(p);
 +      }
 +    }
 +    return;
 +  }
 +
 +  @Override
 +  public Map<String, Integer> getPipelineInfo() {
 +    final Map<String, Integer> pipelineInfo = new HashMap<>();
 +    for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) {
 +      pipelineInfo.put(state.toString(), 0);
 +    }
 +    stateManager.getPipelines().forEach(pipeline ->
 +        pipelineInfo.computeIfPresent(
 +            pipeline.getPipelineState().toString(), (k, v) -> v + 1));
 +    return pipelineInfo;
 +  }
 +
 +  /**
 +   * Schedules a fixed interval job to create pipelines.
 +   */
 +  @Override
 +  public void startPipelineCreator() {
 +    backgroundPipelineCreator.startFixedIntervalPipelineCreator();
 +  }
 +
 +  /**
 +   * Triggers pipeline creation after the specified time.
 +   */
 +  @Override
 +  public void triggerPipelineCreation() {
 +    backgroundPipelineCreator.triggerPipelineCreation();
 +  }
 +
 +  /**
 +   * Activates a dormant pipeline.
 +   *
 +   * @param pipelineID ID of the pipeline to activate.
 +   * @throws IOException in case of any Exception
 +   */
 +  @Override
 +  public void activatePipeline(PipelineID pipelineID)
 +      throws IOException {
 +    lock.writeLock().lock();
 +    try {
 +      Pipeline.PipelineState state = stateManager.
 +              getPipeline(pipelineID).getPipelineState();
 +      stateManager.activatePipeline(pipelineID);
 +      updatePipelineStateInDb(pipelineID, state);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Deactivates an active pipeline.
 +   *
 +   * @param pipelineID ID of the pipeline to deactivate.
 +   * @throws IOException in case of any Exception
 +   */
 +  @Override
 +  public void deactivatePipeline(PipelineID pipelineID)
 +      throws IOException {
 +    lock.writeLock().lock();
 +    try {
 +      Pipeline.PipelineState state = stateManager.
 +              getPipeline(pipelineID).getPipelineState();
 +      stateManager.deactivatePipeline(pipelineID);
 +      updatePipelineStateInDb(pipelineID, state);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * Wait a pipeline to be OPEN.
 +   *
 +   * @param pipelineID ID of the pipeline to wait for.
 +   * @param timeout    wait timeout, millisecond, 0 to use default value
 +   * @throws IOException in case of any Exception, such as timeout
 +   */
 +  @Override
 +  public void waitPipelineReady(PipelineID pipelineID, long timeout)
 +      throws IOException {
 +    long st = Time.monotonicNow();
 +    if (timeout == 0) {
 +      timeout = pipelineWaitDefaultTimeout;
 +    }
 +
 +    boolean ready;
 +    Pipeline pipeline;
 +    do {
 +      try {
 +        pipeline = stateManager.getPipeline(pipelineID);
 +      } catch (PipelineNotFoundException e) {
 +        throw new PipelineNotFoundException(String.format(
 +            "Pipeline %s cannot be found", pipelineID));
 +      }
 +      ready = pipeline.isOpen();
 +      if (!ready) {
 +        try {
 +          Thread.sleep((long)100);
 +        } catch (InterruptedException e) {
 +          Thread.currentThread().interrupt();
 +        }
 +      }
 +    } while (!ready && Time.monotonicNow() - st < timeout);
 +
 +    if (!ready) {
 +      throw new IOException(String.format("Pipeline %s is not ready in %d ms",
 +          pipelineID, timeout));
 +    }
 +  }
 +
 +  /**
 +   * Removes the pipeline from the db and pipeline state map.
 +   *
 +   * @param pipeline - pipeline to be removed
 +   * @throws IOException
 +   */
 +  protected void removePipeline(Pipeline pipeline) throws IOException {
 +    pipelineFactory.close(pipeline.getType(), pipeline);
 +    PipelineID pipelineID = pipeline.getId();
 +    lock.writeLock().lock();
 +    try {
 +      if (pipelineStore != null) {
 +        pipelineStore.delete(pipelineID);
 +        Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID);
 +        nodeManager.removePipeline(pipelineRemoved);
 +        metrics.incNumPipelineDestroyed();
 +      }
 +    } catch (IOException ex) {
 +      metrics.incNumPipelineDestroyFailed();
 +      throw ex;
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  @Override
 +  public void incNumBlocksAllocatedMetric(PipelineID id) {
 +    metrics.incNumBlocksAllocated(id);
 +  }
 +
 +  @Override
 +  public void close() throws IOException {
 +    if (scheduler != null) {
 +      scheduler.close();
 +      scheduler = null;
 +    }
 +
 +    if(pmInfoBean != null) {
 +      MBeans.unregister(this.pmInfoBean);
 +      pmInfoBean = null;
 +    }
 +
 +    SCMPipelineMetrics.unRegister();
 +
 +    // shutdown pipeline provider.
 +    pipelineFactory.shutdown();
 +    lock.writeLock().lock();
 +    try {
 +      if (pipelineStore != null) {
 +        pipelineStore.close();
 +        pipelineStore = null;
 +      }
 +    } catch (Exception ex) {
 +      LOG.error("Pipeline  store close failed", ex);
 +    } finally {
 +      lock.writeLock().unlock();
 +    }
 +  }
 +
 +  /**
 +   * returns min number of healthy volumes from the set of
 +   * datanodes constituting the pipeline.
 +   * @param  pipeline
 +   * @return healthy volume count
 +   */
 +  @Override
 +  public int minHealthyVolumeNum(Pipeline pipeline) {
 +    return nodeManager.minHealthyVolumeNum(pipeline.getNodes());
 +  }
 +
 +  /**
 +   * returns max count of raft log volumes from the set of
 +   * datanodes constituting the pipeline.
 +   * @param  pipeline
 +   * @return healthy volume count
 +   */
 +  @Override
 +  public int minPipelineLimit(Pipeline pipeline) {
 +    return nodeManager.minPipelineLimit(pipeline.getNodes());
 +  }
 +
 +  protected ReadWriteLock getLock() {
 +    return lock;
 +  }
 +
 +  @VisibleForTesting
 +  public PipelineFactory getPipelineFactory() {
 +    return pipelineFactory;
 +  }
 +
 +  protected NodeManager getNodeManager() {
 +    return nodeManager;
 +  }
 +
 +  @Override
 +  public boolean getSafeModeStatus() {
 +    return this.isInSafeMode.get();
 +  }
 +
 +  @Override
 +  public void reinitialize(Table<PipelineID, Pipeline> store)
 +      throws IOException {
 +    throw new RuntimeException("Not supported operation.");
 +  }
 +
 +  @Override
 +  public void freezePipelineCreation() {
 +    freezePipelineCreation.set(true);
 +    backgroundPipelineCreator.pause();
 +  }
 +
 +  @Override
 +  public void resumePipelineCreation() {
 +    freezePipelineCreation.set(false);
 +    backgroundPipelineCreator.resume();
 +  }
 +
++  @Override
++  public void acquireReadLock() {
++
++  }
++
++  @Override
++  public void releaseReadLock() {
++
++  }
++
++  @Override
++  public void acquireWriteLock() {
++
++  }
++
++  @Override
++  public void releaseWriteLock() {
++
++  }
++
 +  public Table<PipelineID, Pipeline> getPipelineStore() {
 +    return pipelineStore;
 +  }
 +
 +  @Override
 +  public void onMessage(SafeModeStatus status,
 +                        EventPublisher publisher) {
 +    // TODO: #CLUTIL - handle safemode getting re-enabled
 +    boolean currentAllowPipelines =
 +        pipelineCreationAllowed.getAndSet(status.isPreCheckComplete());
 +    boolean currentlyInSafeMode =
 +        isInSafeMode.getAndSet(status.isInSafeMode());
 +
 +    // Trigger pipeline creation only if the preCheck status has changed to
 +    // complete.
 +    if (isPipelineCreationAllowed() && !currentAllowPipelines) {
 +      triggerPipelineCreation();
 +    }
 +    // Start the pipeline creation thread only when safemode switches off
 +    if (!getSafeModeStatus() && currentlyInSafeMode) {
 +      startPipelineCreator();
 +    }
 +  }
 +
 +  @VisibleForTesting
 +  protected static Logger getLog() {
 +    return LOG;
 +  }
 +}
diff --cc 
hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
index 1ee7bce,793dd30..55715bd
--- 
a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
+++ 
b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java
@@@ -64,11 -65,8 +66,13 @@@ public class TestOzoneConfigurationFiel
          ".duration"); // Deprecated config
      configurationPropsToSkipCompare
          .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION);
 +    // Currently replication and type configs moved to server side.
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION);
 +    configurationPropsToSkipCompare
 +        .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE);
+     // This property is tested in TestHttpServer2 instead
+     xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY);
      addPropertiesNotInXml();
    }
  

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to