This is an automated email from the ASF dual-hosted git repository. sodonnell pushed a commit to branch HDDS-3816-ec in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 2df6c9caf594b6fccfe965984b361e7bc194a01e Merge: 90c4d92 e392c1a Author: S O'Donnell <[email protected]> AuthorDate: Fri Sep 10 15:20:53 2021 +0100 Merge branch 'master' into ec branch .github/workflows/post-commit.yml | 2 +- .../org/apache/hadoop/hdds/HddsConfigKeys.java | 5 + .../apache/hadoop/hdds/recon/ReconConfigKeys.java | 10 + .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 10 +- .../org/apache/hadoop/hdds/scm/ha/SCMHAUtils.java | 2 + .../hadoop/hdds/scm/net/NetworkTopologyImpl.java | 24 +- .../java/org/apache/hadoop/ozone/OzoneConsts.java | 2 - .../upgrade/AbstractLayoutVersionManager.java | 158 +++-- .../common/src/main/resources/ozone-default.xml | 34 +- .../hdds/scm/net/TestNetworkTopologyImpl.java | 33 +- .../apache/hadoop/ozone/HddsDatanodeService.java | 5 +- .../container/common/helpers/ContainerUtils.java | 38 +- .../ozone/container/common/impl/ContainerSet.java | 3 + .../container/common/impl/HddsDispatcher.java | 1 + .../common/interfaces/ContainerDispatcher.java | 7 +- .../common/statemachine/DatanodeConfiguration.java | 21 + .../common/statemachine/DatanodeStateMachine.java | 6 +- .../CloseContainerCommandHandler.java | 2 + .../ClosePipelineCommandHandler.java | 13 +- .../commandhandler/DeleteBlocksCommandHandler.java | 4 +- .../states/endpoint/VersionEndpointTask.java | 5 +- .../server/ratis/ContainerStateMachine.java | 11 +- .../container/common/utils/HddsVolumeUtil.java | 72 +- .../ozone/container/common/volume/HddsVolume.java | 2 +- .../container/common/volume/MutableVolumeSet.java | 13 - .../container/keyvalue/KeyValueContainer.java | 12 +- .../container/keyvalue/KeyValueContainerCheck.java | 4 +- .../ozone/container/keyvalue/KeyValueHandler.java | 52 +- .../container/keyvalue/helpers/ChunkUtils.java | 18 +- .../keyvalue/helpers/KeyValueContainerUtil.java | 9 +- .../keyvalue/impl/FilePerBlockStrategy.java | 9 +- .../container/ozoneimpl/ContainerController.java | 24 +- .../ozone/container/ozoneimpl/ContainerReader.java | 161 ++--- .../ozone/container/ozoneimpl/OzoneContainer.java | 5 +- .../replication/GrpcReplicationClient.java | 4 +- .../container/replication/ReplicationServer.java | 7 +- .../upgrade/DataNodeUpgradeFinalizer.java | 5 +- .../upgrade/DatanodeMetadataFeatures.java | 53 -- .../ScmHAFinalizeUpgradeActionDatanode.java | 129 ++++ .../ScmHAFirstUpgradeLayoutChangeAction.java | 45 -- .../upgrade/VersionedDatanodeFeatures.java | 145 ++++ .../hadoop/ozone/container/common/ScmTestMock.java | 9 + .../ozone/container/common/TestContainerCache.java | 23 +- .../common/TestKeyValueContainerData.java | 8 +- .../common/impl/TestContainerDataYaml.java | 43 +- .../TestCloseContainerCommandHandler.java | 33 +- .../upgrade/TestDatanodeUpgradeToScmHA.java | 741 +++++++++++++++++++++ hadoop-hdds/docs/content/concept/OzoneManager.md | 2 +- .../docs/content/concept/OzoneManager.zh.md | 2 +- hadoop-hdds/docs/content/feature/SCM-HA.md | 4 +- hadoop-hdds/docs/content/interface/ReconApi.md | 217 +++++- .../docs/themes/ozonedoc/layouts/index.html | 2 + .../hadoop/hdds/server/http/HttpServer2.java | 17 +- .../hadoop/hdds/server/http/TestHttpServer2.java | 54 ++ .../src/main/proto/DatanodeClientProtocol.proto | 1 + .../hadoop/hdds/scm/block/DeletedBlockLogImpl.java | 7 +- .../hdds/scm/container/ContainerManagerImpl.java | 67 +- .../scm/container/balancer/ContainerBalancer.java | 8 +- .../balancer/ContainerBalancerConfiguration.java | 14 +- .../replication/ReplicationManagerMetrics.java | 44 +- .../org/apache/hadoop/hdds/scm/ha/RatisUtil.java | 7 +- .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 3 +- .../hdds/scm/node/DatanodeAdminMonitorImpl.java | 18 + .../hadoop/hdds/scm/pipeline/PipelineManager.java | 20 + .../hdds/scm/pipeline/PipelineManagerImpl.java | 51 +- .../hdds/scm/pipeline/SCMPipelineManager.java | 20 + .../pipeline/WritableRatisContainerProvider.java | 99 ++- .../hdds/scm/server/StorageContainerManager.java | 48 +- .../TestSCMContainerPlacementRackAware.java | 27 + .../hdds/scm/pipeline/MockPipelineManager.java | 20 + .../ozone/container/common/TestEndPoint.java | 4 +- .../org/apache/hadoop/hdds/cli/OzoneAdmin.java | 7 + .../apache/hadoop/ozone/client/rpc/RpcClient.java | 8 +- .../apache/hadoop/ozone/om/helpers/OmKeyInfo.java | 36 +- hadoop-ozone/dist/src/main/compose/ozone/README.md | 6 + .../dist/src/main/compose/ozone/docker-config | 2 + hadoop-ozone/dist/src/main/compose/ozone/run.sh | 4 + .../compose/ozonesecure-ha/docker-compose.yaml | 2 + .../src/main/compose/ozonesecure/docker-config | 2 +- hadoop-ozone/dist/src/main/compose/test-all.sh | 3 +- hadoop-ozone/dist/src/main/compose/testlib.sh | 23 +- .../dist/src/main/compose/upgrade/README.md | 35 +- .../compose/upgrade/delete-and-regenerate-data.sh | 53 -- .../dist/src/main/compose/upgrade/delete-data.sh | 36 - hadoop-ozone/dist/src/main/compose/upgrade/test.sh | 9 +- .../manual-upgrade/0.5.0-1.1.0/callback.sh | 40 -- .../upgrades/manual-upgrade}/README.md | 7 +- .../{1.0.0-1.1.0 => 1.1.0-1.2.0}/callback.sh | 17 +- .../upgrade/upgrades/non-rolling-upgrade/driver.sh | 31 +- .../dist/src/main/compose/versions/0.5.0.sh | 26 - .../dist/src/main/compose/versions/1.0.0.sh | 26 - .../dist/src/main/compose/versions/1.1.0.sh | 26 - hadoop-ozone/dist/src/main/compose/xcompat/test.sh | 4 - hadoop-ozone/dist/src/main/k8s/examples/testlib.sh | 4 +- .../dist/src/main/smoketest/ozonefs/setup.robot | 2 +- hadoop-ozone/integration-test/pom.xml | 5 + .../org/apache/hadoop/ozone/MiniOzoneCluster.java | 7 +- .../apache/hadoop/ozone/MiniOzoneClusterImpl.java | 2 +- .../hadoop/ozone/MiniOzoneClusterProvider.java | 283 ++++++++ .../hadoop/ozone/MiniOzoneHAClusterImpl.java | 21 +- .../hadoop/ozone/TestOzoneConfigurationFields.java | 11 +- .../ozone/container/TestContainerReplication.java | 47 -- .../container/ozoneimpl/TestOzoneContainer.java | 8 +- .../ozone/dn/TestDatanodeLayoutUpgradeTool.java | 131 ---- .../hadoop/ozone/freon/TestRandomKeyGenerator.java | 3 - .../hadoop/ozone/om/TestOzoneManagerHAWithACL.java | 24 +- .../ozone/om/TestOzoneManagerHAWithData.java | 32 +- .../hadoop/ozone/scm/TestXceiverClientManager.java | 18 +- .../scm/node/TestDecommissionAndMaintenance.java | 124 ++-- .../hadoop/ozone/shell/TestNSSummaryAdmin.java | 72 ++ .../ozone/om/request/file/OMFileCreateRequest.java | 3 +- .../request/file/OMFileCreateRequestWithFSO.java | 5 +- .../ozone/om/request/key/OMKeyCreateRequest.java | 7 +- .../om/request/key/OMKeyCreateRequestWithFSO.java | 3 +- .../ozone/om/upgrade/OMLayoutVersionManager.java | 3 +- .../om/request/key/TestOMKeyCreateRequest.java | 70 +- .../request/key/TestOMKeyCreateRequestWithFSO.java | 13 + .../org/apache/hadoop/fs/ozone/OzoneFsShell.java | 4 +- .../apache/hadoop/ozone/recon/ReconHttpServer.java | 6 +- .../hadoop/ozone/recon/ReconServerConfigKeys.java | 4 - .../hadoop/ozone/recon/api/NSSummaryEndpoint.java | 137 ++-- .../hadoop/ozone/recon/api/types/DUResponse.java | 14 + .../ozone/recon/api/types/ResponseStatus.java | 1 + .../ozone/recon/scm/ReconPipelineManager.java | 12 +- .../webapps/recon/ozone-recon-web/api/db.json | 324 ++++++++- .../webapps/recon/ozone-recon-web/api/routes.json | 22 +- .../src/components/navBar/navBar.tsx | 5 + .../src/components/rightDrawer/rightDrawer.tsx | 72 ++ .../src/constants/breadcrumbs.constants.tsx | 3 +- .../webapps/recon/ozone-recon-web/src/routes.tsx | 5 + .../diskUsage/diskUsage.less} | 41 +- .../src/views/diskUsage/diskUsage.tsx | 444 ++++++++++++ .../src/views/overview/overview.tsx | 3 +- .../ozone/admin/nssummary/DiskUsageSubCommand.java | 214 ++++++ .../admin/nssummary/FileSizeDistSubCommand.java | 129 ++++ .../ozone/admin/nssummary/NSSummaryAdmin.java | 137 ++++ .../ozone/admin/nssummary/NSSummaryCLIUtils.java | 174 +++++ .../admin/nssummary/QuotaUsageSubCommand.java | 117 ++++ .../ozone/admin/nssummary/SummarySubCommand.java | 115 ++++ .../ozone/admin/nssummary/package-info.java} | 14 +- .../admin/scm/GetScmRatisRolesSubcommand.java | 7 +- .../apache/hadoop/ozone/debug/DatanodeLayout.java | 110 --- .../ozone/shell/volume/CreateVolumeHandler.java | 4 +- pom.xml | 2 +- 144 files changed, 4835 insertions(+), 1364 deletions(-) diff --cc hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java index cbbe371,00e819f..521d563 --- a/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java +++ b/hadoop-hdds/container-service/src/main/java/org/apache/hadoop/ozone/container/common/helpers/ContainerUtils.java @@@ -186,23 -187,27 +188,29 @@@ public final class ContainerUtils * Verify that the checksum stored in containerData is equal to the * computed checksum. */ - public static void verifyChecksum(ContainerData containerData) - throws IOException { - String storedChecksum = containerData.getChecksum(); - - Yaml yaml = ContainerDataYaml.getYamlForContainerType( - containerData.getContainerType(), - containerData instanceof KeyValueContainerData && - ((KeyValueContainerData)containerData).getReplicaIndex() > 0); - containerData.computeAndSetChecksum(yaml); - String computedChecksum = containerData.getChecksum(); - - if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) { - throw new StorageContainerException("Container checksum error for " + - "ContainerID: " + containerData.getContainerID() + ". " + - "\nStored Checksum: " + storedChecksum + - "\nExpected Checksum: " + computedChecksum, - CONTAINER_CHECKSUM_ERROR); + public static void verifyChecksum(ContainerData containerData, + ConfigurationSource conf) throws IOException { + boolean enabled = conf.getBoolean( + HddsConfigKeys.HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED, + HddsConfigKeys. + HDDS_CONTAINER_CHECKSUM_VERIFICATION_ENABLED_DEFAULT); + if(enabled) { + String storedChecksum = containerData.getChecksum(); + + Yaml yaml = ContainerDataYaml.getYamlForContainerType( - containerData.getContainerType()); ++ containerData.getContainerType(), ++ containerData instanceof KeyValueContainerData && ++ ((KeyValueContainerData)containerData).getReplicaIndex() > 0); + containerData.computeAndSetChecksum(yaml); + String computedChecksum = containerData.getChecksum(); + + if (storedChecksum == null || !storedChecksum.equals(computedChecksum)) { + throw new StorageContainerException("Container checksum error for " + + "ContainerID: " + containerData.getContainerID() + ". " + + "\nStored Checksum: " + storedChecksum + + "\nExpected Checksum: " + computedChecksum, + CONTAINER_CHECKSUM_ERROR); + } } } diff --cc hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java index a9642e7,4dc38e9..67a21da --- a/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java +++ b/hadoop-hdds/container-service/src/test/java/org/apache/hadoop/ozone/container/common/impl/TestContainerDataYaml.java @@@ -27,7 -28,7 +29,8 @@@ import org.apache.hadoop.ozone.OzoneCon import org.apache.hadoop.ozone.container.common.helpers.ContainerUtils; import org.apache.hadoop.ozone.container.keyvalue.ChunkLayoutTestInfo; import org.apache.hadoop.ozone.container.keyvalue.KeyValueContainerData; +import org.junit.Assert; + import org.apache.hadoop.ozone.container.upgrade.VersionedDatanodeFeatures; import org.apache.ozone.test.GenericTestUtils; import org.junit.Test; import org.junit.runner.RunWith; @@@ -90,8 -90,8 +94,9 @@@ public class TestContainerDataYaml keyValueContainerData.setMetadataPath(testRoot); keyValueContainerData.setChunksPath(testRoot); keyValueContainerData.updateDataScanTime(SCAN_TIME); - keyValueContainerData.setSchemaVersion(OzoneConsts.SCHEMA_LATEST); + keyValueContainerData.setSchemaVersion( + VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion()); + keyValueContainerData.setReplicaIndex(replicaIndex); File containerFile = new File(testRoot, containerPath); @@@ -135,9 -135,8 +140,9 @@@ kvData.lastDataScanTime().get().toEpochMilli()); assertEquals(SCAN_TIME.toEpochMilli(), kvData.getDataScanTimestamp().longValue()); - assertEquals(OzoneConsts.SCHEMA_LATEST, + assertEquals(VersionedDatanodeFeatures.SchemaV2.chooseSchemaVersion(), - kvData.getSchemaVersion()); + kvData.getSchemaVersion()); + assertEquals(7, kvData.getReplicaIndex()); // Update ContainerData. kvData.addMetadata(OzoneConsts.VOLUME, VOLUME_OWNER); @@@ -249,24 -233,7 +254,24 @@@ public void testChecksumInContainerFile() throws IOException { long containerID = testContainerID++; - File containerFile = createContainerFile(containerID); + File containerFile = createContainerFile(containerID, 0); + + // Read from .container file, and verify data. + KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml + .readContainerFile(containerFile); - ContainerUtils.verifyChecksum(kvData); ++ ContainerUtils.verifyChecksum(kvData, conf); + + cleanup(); + } + + /** + * Test to verify {@link ContainerUtils#verifyChecksum(ContainerData)}. + */ + @Test + public void testChecksumInContainerFileWithReplicaIndex() throws IOException { + long containerID = testContainerID++; + + File containerFile = createContainerFile(containerID, 10); // Read from .container file, and verify data. KeyValueContainerData kvData = (KeyValueContainerData) ContainerDataYaml diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java index dd9f514,c6651f4..4d76cb7 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/PipelineManagerImpl.java @@@ -173,10 -163,9 +172,10 @@@ public class PipelineManagerImpl implem throw new IOException(message); } - lock.lock(); + acquireWriteLock(); try { - Pipeline pipeline = pipelineFactory.create(replicationConfig); + Pipeline pipeline = pipelineFactory.create(replicationConfig, + excludedNodes, favoredNodes); stateManager.addPipeline(pipeline.getProtobufMessage( ClientVersions.CURRENT_VERSION)); recordMetricsForPipeline(pipeline); diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java index 4ef5fcd,0000000..dba57df mode 100644,000000..100644 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/pipeline/SCMPipelineManager.java @@@ -1,822 -1,0 +1,842 @@@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdds.scm.pipeline; + +import javax.management.ObjectName; +import java.io.IOException; +import java.time.Duration; +import java.time.Instant; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.NavigableSet; +import java.util.Set; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.locks.ReadWriteLock; +import java.util.concurrent.locks.ReentrantReadWriteLock; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdds.HddsConfigKeys; +import org.apache.hadoop.hdds.client.RatisReplicationConfig; +import org.apache.hadoop.hdds.client.ReplicationConfig; +import org.apache.hadoop.hdds.conf.ConfigurationSource; +import org.apache.hadoop.hdds.protocol.DatanodeDetails; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; +import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationType; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.container.ContainerID; +import org.apache.hadoop.hdds.scm.events.SCMEvents; +import org.apache.hadoop.hdds.scm.exceptions.SCMException; +import org.apache.hadoop.hdds.scm.ha.SCMContext; +import org.apache.hadoop.hdds.scm.node.NodeManager; +import org.apache.hadoop.hdds.scm.safemode.SCMSafeModeManager.SafeModeStatus; +import org.apache.hadoop.hdds.server.events.EventHandler; +import org.apache.hadoop.hdds.server.events.EventPublisher; +import org.apache.hadoop.hdds.utils.Scheduler; +import org.apache.hadoop.hdds.utils.db.Table; +import org.apache.hadoop.hdds.utils.db.Table.KeyValue; +import org.apache.hadoop.hdds.utils.db.TableIterator; +import org.apache.hadoop.metrics2.util.MBeans; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.hadoop.hdds.scm.exceptions.SCMException.ResultCodes.FAILED_TO_FIND_SUITABLE_NODE; + +/** + * Implements api needed for management of pipelines. All the write operations + * for pipelines must come via PipelineManager. It synchronises all write + * and read operations via a ReadWriteLock. + */ +public class SCMPipelineManager implements + PipelineManager, EventHandler<SafeModeStatus> { + + private static final Logger LOG = + LoggerFactory.getLogger(SCMPipelineManager.class); + + private final ReadWriteLock lock; + private PipelineFactory pipelineFactory; + private StateManager stateManager; + private final BackgroundPipelineCreator backgroundPipelineCreator; + private Scheduler scheduler; + + private final EventPublisher eventPublisher; + private final NodeManager nodeManager; + private final SCMPipelineMetrics metrics; + private final ConfigurationSource conf; + private long pipelineWaitDefaultTimeout; + // Pipeline Manager MXBean + private ObjectName pmInfoBean; + + private Table<PipelineID, Pipeline> pipelineStore; + + private final AtomicBoolean isInSafeMode; + // Used to track if the safemode pre-checks have completed. This is designed + // to prevent pipelines being created until sufficient nodes have registered. + private final AtomicBoolean pipelineCreationAllowed; + + // This allows for freezing/resuming the new pipeline creation while the + // SCM is already out of SafeMode. + private AtomicBoolean freezePipelineCreation; + + public SCMPipelineManager(ConfigurationSource conf, + NodeManager nodeManager, + Table<PipelineID, Pipeline> pipelineStore, + EventPublisher eventPublisher) + throws IOException { + this(conf, nodeManager, pipelineStore, eventPublisher, null, null); + this.stateManager = new PipelineStateManager(); + this.pipelineFactory = new PipelineFactory(nodeManager, + stateManager, conf, eventPublisher, SCMContext.emptyContext()); + this.pipelineStore = pipelineStore; + initializePipelineState(); + } + + protected SCMPipelineManager(ConfigurationSource conf, + NodeManager nodeManager, + Table<PipelineID, Pipeline> pipelineStore, + EventPublisher eventPublisher, + PipelineStateManager pipelineStateManager, + PipelineFactory pipelineFactory) + throws IOException { + this.lock = new ReentrantReadWriteLock(); + this.pipelineStore = pipelineStore; + this.conf = conf; + this.pipelineFactory = pipelineFactory; + this.stateManager = pipelineStateManager; + // TODO: See if thread priority needs to be set for these threads + scheduler = new Scheduler("RatisPipelineUtilsThread", false, 1); + this.backgroundPipelineCreator = + new BackgroundPipelineCreator(this, scheduler, conf); + this.eventPublisher = eventPublisher; + this.nodeManager = nodeManager; + this.metrics = SCMPipelineMetrics.create(); + this.pmInfoBean = MBeans.register("SCMPipelineManager", + "SCMPipelineManagerInfo", this); + this.pipelineWaitDefaultTimeout = conf.getTimeDuration( + HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL, + HddsConfigKeys.HDDS_PIPELINE_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); + this.isInSafeMode = new AtomicBoolean(conf.getBoolean( + HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED, + HddsConfigKeys.HDDS_SCM_SAFEMODE_ENABLED_DEFAULT)); + // Pipeline creation is only allowed after the safemode prechecks have + // passed, eg sufficient nodes have registered. + this.pipelineCreationAllowed = new AtomicBoolean(!this.isInSafeMode.get()); + // controls freezing/resuming pipeline creation regardless of SafeMode + // status. + this.freezePipelineCreation = new AtomicBoolean(false); + } + + public StateManager getStateManager() { + return stateManager; + } + + @VisibleForTesting + public void setPipelineProvider(ReplicationType replicationType, + PipelineProvider provider) { + pipelineFactory.setProvider(replicationType, provider); + } + + @VisibleForTesting + public void allowPipelineCreation() { + this.pipelineCreationAllowed.set(true); + } + + @VisibleForTesting + public boolean isPipelineCreationAllowed() { + return pipelineCreationAllowed.get(); + } + + protected void initializePipelineState() throws IOException { + if (pipelineStore.isEmpty()) { + LOG.info("No pipeline exists in current db"); + return; + } + TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> + iterator = pipelineStore.iterator(); + while (iterator.hasNext()) { + Pipeline pipeline = nextPipelineFromIterator(iterator); + stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + } + } + + private Pipeline nextPipelineFromIterator( + TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it + ) throws IOException { + KeyValue<PipelineID, Pipeline> actual = it.next(); + Pipeline pipeline = actual.getValue(); + PipelineID pipelineID = actual.getKey(); + checkKeyAndReplaceIfObsolete(it, pipeline, pipelineID); + return pipeline; + } + + /** + * This method is part of the change that happens in HDDS-3925, and we can + * and should remove this on later on. + * The purpose of the change is to get rid of protobuf serialization in the + * SCM database Pipeline table keys. The keys are not used anywhere, and the + * PipelineID that is used as a key is in the value as well, so we can detect + * a change in the key translation to byte[] and if we have the old format + * we refresh the table contents during SCM startup. + * + * If this fails in the remove, then there is an IOException coming from + * RocksDB itself, in this case in memory structures will still be fine and + * SCM should be operational, however we will attempt to replace the old key + * at next startup. In this case removing of the pipeline will leave the + * pipeline in RocksDB, and during next startup we will attempt to delete it + * again. This does not affect any runtime operations. + * If a Pipeline should have been deleted but remained in RocksDB, then at + * next startup it will be replaced and added with the new key, then SCM will + * detect that it is an invalid Pipeline and successfully delete it with the + * new key. + * For further info check the JIRA. + * + * @param it the iterator used to iterate the Pipeline table + * @param pipeline the pipeline read already from the iterator + * @param pipelineID the pipeline ID read from the raw data via the iterator + */ + private void checkKeyAndReplaceIfObsolete( + TableIterator<PipelineID, ? extends KeyValue<PipelineID, Pipeline>> it, + Pipeline pipeline, + PipelineID pipelineID + ) { + if (!pipelineID.equals(pipeline.getId())) { + try { + LOG.info("Found pipeline in old format key : {}", pipeline.getId()); + it.removeFromDB(); + pipelineStore.put(pipeline.getId(), pipeline); + } catch (IOException e) { + LOG.info("Pipeline table in RocksDB has an old key format, and " + + "removing the pipeline with the old key was unsuccessful." + + "Pipeline: {}", pipeline); + } + } + } + + private void recordMetricsForPipeline(Pipeline pipeline) { + metrics.incNumPipelineAllocated(); + if (pipeline.isOpen()) { + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } + switch (pipeline.getType()) { + case STAND_ALONE: + return; + case RATIS: + List<Pipeline> overlapPipelines = RatisPipelineUtils + .checkPipelineContainSameDatanodes(stateManager, pipeline); + if (!overlapPipelines.isEmpty()) { + // Count 1 overlap at a time. + metrics.incNumPipelineContainSameDatanodes(); + //TODO remove until pipeline allocation is proved equally distributed. + for (Pipeline overlapPipeline : overlapPipelines) { + LOG.info("Pipeline: " + pipeline.getId().toString() + + " contains same datanodes as previous pipelines: " + + overlapPipeline.getId().toString() + " nodeIds: " + + pipeline.getNodes().get(0).getUuid().toString() + + ", " + pipeline.getNodes().get(1).getUuid().toString() + + ", " + pipeline.getNodes().get(2).getUuid().toString()); + } + } + return; + case CHAINED: + // Not supported. + default: + // Not supported. + return; + } + } + + @Override + public Pipeline createPipeline(ReplicationConfig replicationConfig) + throws IOException { + return createPipeline(replicationConfig, Collections.emptyList(), + Collections.emptyList()); + } + + @Override + public Pipeline createPipeline(ReplicationConfig replicationConfig, + List<DatanodeDetails> excludedNodes, List<DatanodeDetails> favoredNodes) + throws IOException { + if (!isPipelineCreationAllowed() + && replicationConfig.getRequiredNodes() != 1) { + LOG.debug("Pipeline creation is not allowed until safe mode prechecks " + + "complete"); + throw new IOException("Pipeline creation is not allowed as safe mode " + + "prechecks have not yet passed"); + } + if (freezePipelineCreation.get()) { + LOG.debug("Pipeline creation is frozen while an upgrade is in " + + "progress"); + throw new IOException("Pipeline creation is frozen while an upgrade " + + "is in progress"); + } + lock.writeLock().lock(); + try { + Pipeline pipeline = pipelineFactory.create(replicationConfig, + excludedNodes, favoredNodes); + if (pipelineStore != null) { + pipelineStore.put(pipeline.getId(), pipeline); + } + stateManager.addPipeline(pipeline); + nodeManager.addPipeline(pipeline); + recordMetricsForPipeline(pipeline); + return pipeline; + } catch (IOException ex) { + if (ex instanceof SCMException && + ((SCMException) ex).getResult() == FAILED_TO_FIND_SUITABLE_NODE) { + // Avoid spam SCM log with errors when SCM has enough open pipelines + LOG.debug("Can't create more pipelines of replicationConfig: {}. " + + "Reason: {}", replicationConfig, ex.getMessage()); + } else { + LOG.error("Failed to create pipeline of replicationConfig: {}. " + + "Exception: {}", replicationConfig, ex.getMessage()); + } + metrics.incNumPipelineCreationFailed(); + throw ex; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Pipeline createPipeline(ReplicationConfig replicationConfig, + List<DatanodeDetails> nodes) { + // This will mostly be used to create dummy pipeline for SimplePipelines. + // We don't update the metrics for SimplePipelines. + lock.writeLock().lock(); + try { + return pipelineFactory.create(replicationConfig, nodes); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public Pipeline getPipeline(PipelineID pipelineID) + throws PipelineNotFoundException { + lock.readLock().lock(); + try { + return stateManager.getPipeline(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public boolean containsPipeline(PipelineID pipelineID) { + lock.readLock().lock(); + try { + getPipeline(pipelineID); + return true; + } catch (PipelineNotFoundException e) { + return false; + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines() { + lock.readLock().lock(); + try { + return stateManager.getPipelines(); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationConfig replicationConfig) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(replicationConfig); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationConfig replicationConfig, + Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelines(replicationConfig, state); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public List<Pipeline> getPipelines(ReplicationConfig replicationConfig, + Pipeline.PipelineState state, + Collection<DatanodeDetails> excludeDns, + Collection<PipelineID> excludePipelines) { + lock.readLock().lock(); + try { + return stateManager + .getPipelines(replicationConfig, state, excludeDns, excludePipelines); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public int getPipelineCount(ReplicationConfig replicationConfig, + Pipeline.PipelineState state) { + lock.readLock().lock(); + try { + return stateManager.getPipelineCount(replicationConfig, state); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public void addContainerToPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.addContainerToPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + private void updatePipelineStateInDb(PipelineID pipelineId, + Pipeline.PipelineState oldState) + throws IOException { + // null check is here to prevent the case where SCM store + // is closed but the staleNode handlers/pipleine creations + // still try to access it. + if (pipelineStore != null) { + try { + pipelineStore.put(pipelineId, getPipeline(pipelineId)); + } catch (IOException ex) { + LOG.warn("Pipeline {} state update failed", pipelineId); + // revert back to old state in memory + stateManager.updatePipelineState(pipelineId, oldState); + } + } + } + + @Override + public void removeContainerFromPipeline(PipelineID pipelineID, + ContainerID containerID) throws IOException { + lock.writeLock().lock(); + try { + stateManager.removeContainerFromPipeline(pipelineID, containerID); + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public NavigableSet<ContainerID> getContainersInPipeline( + PipelineID pipelineID) throws IOException { + lock.readLock().lock(); + try { + return stateManager.getContainers(pipelineID); + } finally { + lock.readLock().unlock(); + } + } + + @Override + public int getNumberOfContainers(PipelineID pipelineID) throws IOException { + return stateManager.getNumberOfContainers(pipelineID); + } + + @Override + public void openPipeline(PipelineID pipelineId) throws IOException { + lock.writeLock().lock(); + try { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineId).getPipelineState(); + Pipeline pipeline = stateManager.openPipeline(pipelineId); + updatePipelineStateInDb(pipelineId, state); + metrics.incNumPipelineCreated(); + metrics.createPerPipelineMetrics(pipeline); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Fire events to close all containers related to the input pipeline. + * @param pipelineId - ID of the pipeline. + * @throws IOException + */ + protected void closeContainersForPipeline(final PipelineID pipelineId) + throws IOException { + Set<ContainerID> containerIDs = stateManager.getContainers(pipelineId); + for (ContainerID containerID : containerIDs) { + eventPublisher.fireEvent(SCMEvents.CLOSE_CONTAINER, containerID); + } + } + + /** + * put pipeline in CLOSED state. + * @param pipeline - ID of the pipeline. + * @param onTimeout - whether to remove pipeline after some time. + * @throws IOException + */ + @Override + public void closePipeline(Pipeline pipeline, boolean onTimeout) + throws IOException { + PipelineID pipelineID = pipeline.getId(); + lock.writeLock().lock(); + try { + if (!pipeline.isClosed()) { + stateManager.updatePipelineState(pipelineID, + Pipeline.PipelineState.CLOSED); + LOG.info("Pipeline {} moved to CLOSED state", pipeline); + } + metrics.removePipelineMetrics(pipelineID); + } finally { + lock.writeLock().unlock(); + } + // close containers. + closeContainersForPipeline(pipelineID); + if (!onTimeout) { + // close pipeline right away. + removePipeline(pipeline); + } + } + + /** + * Scrub pipelines. + */ + @Override + public void scrubPipeline(ReplicationConfig replicationConfig) + throws IOException { + if (!RatisReplicationConfig.hasFactor(replicationConfig, + ReplicationFactor.THREE)) { + // Only srub pipeline for RATIS THREE pipeline + return; + } + Instant currentTime = Instant.now(); + Long pipelineScrubTimeoutInMills = conf.getTimeDuration( + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT, + ScmConfigKeys.OZONE_SCM_PIPELINE_ALLOCATED_TIMEOUT_DEFAULT, + TimeUnit.MILLISECONDS); + + List<Pipeline> candidates = stateManager.getPipelines(replicationConfig); + + for (Pipeline p : candidates) { + // scrub pipelines who stay ALLOCATED for too long. + if (p.getPipelineState() == Pipeline.PipelineState.ALLOCATED && + (currentTime.toEpochMilli() - p.getCreationTimestamp() + .toEpochMilli() >= pipelineScrubTimeoutInMills)) { + LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + + " since it stays at ALLOCATED stage for " + + Duration.between(currentTime, p.getCreationTimestamp()) + .toMinutes() + " mins."); + closePipeline(p, false); + } + // scrub pipelines who stay CLOSED for too long. + if (p.getPipelineState() == Pipeline.PipelineState.CLOSED) { + LOG.info("Scrubbing pipeline: id: " + p.getId().toString() + + " since it is at CLOSED stage."); + closeContainersForPipeline(p.getId()); + removePipeline(p); + } + } + return; + } + + @Override + public Map<String, Integer> getPipelineInfo() { + final Map<String, Integer> pipelineInfo = new HashMap<>(); + for (Pipeline.PipelineState state : Pipeline.PipelineState.values()) { + pipelineInfo.put(state.toString(), 0); + } + stateManager.getPipelines().forEach(pipeline -> + pipelineInfo.computeIfPresent( + pipeline.getPipelineState().toString(), (k, v) -> v + 1)); + return pipelineInfo; + } + + /** + * Schedules a fixed interval job to create pipelines. + */ + @Override + public void startPipelineCreator() { + backgroundPipelineCreator.startFixedIntervalPipelineCreator(); + } + + /** + * Triggers pipeline creation after the specified time. + */ + @Override + public void triggerPipelineCreation() { + backgroundPipelineCreator.triggerPipelineCreation(); + } + + /** + * Activates a dormant pipeline. + * + * @param pipelineID ID of the pipeline to activate. + * @throws IOException in case of any Exception + */ + @Override + public void activatePipeline(PipelineID pipelineID) + throws IOException { + lock.writeLock().lock(); + try { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineID).getPipelineState(); + stateManager.activatePipeline(pipelineID); + updatePipelineStateInDb(pipelineID, state); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Deactivates an active pipeline. + * + * @param pipelineID ID of the pipeline to deactivate. + * @throws IOException in case of any Exception + */ + @Override + public void deactivatePipeline(PipelineID pipelineID) + throws IOException { + lock.writeLock().lock(); + try { + Pipeline.PipelineState state = stateManager. + getPipeline(pipelineID).getPipelineState(); + stateManager.deactivatePipeline(pipelineID); + updatePipelineStateInDb(pipelineID, state); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * Wait a pipeline to be OPEN. + * + * @param pipelineID ID of the pipeline to wait for. + * @param timeout wait timeout, millisecond, 0 to use default value + * @throws IOException in case of any Exception, such as timeout + */ + @Override + public void waitPipelineReady(PipelineID pipelineID, long timeout) + throws IOException { + long st = Time.monotonicNow(); + if (timeout == 0) { + timeout = pipelineWaitDefaultTimeout; + } + + boolean ready; + Pipeline pipeline; + do { + try { + pipeline = stateManager.getPipeline(pipelineID); + } catch (PipelineNotFoundException e) { + throw new PipelineNotFoundException(String.format( + "Pipeline %s cannot be found", pipelineID)); + } + ready = pipeline.isOpen(); + if (!ready) { + try { + Thread.sleep((long)100); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } while (!ready && Time.monotonicNow() - st < timeout); + + if (!ready) { + throw new IOException(String.format("Pipeline %s is not ready in %d ms", + pipelineID, timeout)); + } + } + + /** + * Removes the pipeline from the db and pipeline state map. + * + * @param pipeline - pipeline to be removed + * @throws IOException + */ + protected void removePipeline(Pipeline pipeline) throws IOException { + pipelineFactory.close(pipeline.getType(), pipeline); + PipelineID pipelineID = pipeline.getId(); + lock.writeLock().lock(); + try { + if (pipelineStore != null) { + pipelineStore.delete(pipelineID); + Pipeline pipelineRemoved = stateManager.removePipeline(pipelineID); + nodeManager.removePipeline(pipelineRemoved); + metrics.incNumPipelineDestroyed(); + } + } catch (IOException ex) { + metrics.incNumPipelineDestroyFailed(); + throw ex; + } finally { + lock.writeLock().unlock(); + } + } + + @Override + public void incNumBlocksAllocatedMetric(PipelineID id) { + metrics.incNumBlocksAllocated(id); + } + + @Override + public void close() throws IOException { + if (scheduler != null) { + scheduler.close(); + scheduler = null; + } + + if(pmInfoBean != null) { + MBeans.unregister(this.pmInfoBean); + pmInfoBean = null; + } + + SCMPipelineMetrics.unRegister(); + + // shutdown pipeline provider. + pipelineFactory.shutdown(); + lock.writeLock().lock(); + try { + if (pipelineStore != null) { + pipelineStore.close(); + pipelineStore = null; + } + } catch (Exception ex) { + LOG.error("Pipeline store close failed", ex); + } finally { + lock.writeLock().unlock(); + } + } + + /** + * returns min number of healthy volumes from the set of + * datanodes constituting the pipeline. + * @param pipeline + * @return healthy volume count + */ + @Override + public int minHealthyVolumeNum(Pipeline pipeline) { + return nodeManager.minHealthyVolumeNum(pipeline.getNodes()); + } + + /** + * returns max count of raft log volumes from the set of + * datanodes constituting the pipeline. + * @param pipeline + * @return healthy volume count + */ + @Override + public int minPipelineLimit(Pipeline pipeline) { + return nodeManager.minPipelineLimit(pipeline.getNodes()); + } + + protected ReadWriteLock getLock() { + return lock; + } + + @VisibleForTesting + public PipelineFactory getPipelineFactory() { + return pipelineFactory; + } + + protected NodeManager getNodeManager() { + return nodeManager; + } + + @Override + public boolean getSafeModeStatus() { + return this.isInSafeMode.get(); + } + + @Override + public void reinitialize(Table<PipelineID, Pipeline> store) + throws IOException { + throw new RuntimeException("Not supported operation."); + } + + @Override + public void freezePipelineCreation() { + freezePipelineCreation.set(true); + backgroundPipelineCreator.pause(); + } + + @Override + public void resumePipelineCreation() { + freezePipelineCreation.set(false); + backgroundPipelineCreator.resume(); + } + ++ @Override ++ public void acquireReadLock() { ++ ++ } ++ ++ @Override ++ public void releaseReadLock() { ++ ++ } ++ ++ @Override ++ public void acquireWriteLock() { ++ ++ } ++ ++ @Override ++ public void releaseWriteLock() { ++ ++ } ++ + public Table<PipelineID, Pipeline> getPipelineStore() { + return pipelineStore; + } + + @Override + public void onMessage(SafeModeStatus status, + EventPublisher publisher) { + // TODO: #CLUTIL - handle safemode getting re-enabled + boolean currentAllowPipelines = + pipelineCreationAllowed.getAndSet(status.isPreCheckComplete()); + boolean currentlyInSafeMode = + isInSafeMode.getAndSet(status.isInSafeMode()); + + // Trigger pipeline creation only if the preCheck status has changed to + // complete. + if (isPipelineCreationAllowed() && !currentAllowPipelines) { + triggerPipelineCreation(); + } + // Start the pipeline creation thread only when safemode switches off + if (!getSafeModeStatus() && currentlyInSafeMode) { + startPipelineCreator(); + } + } + + @VisibleForTesting + protected static Logger getLog() { + return LOG; + } +} diff --cc hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java index 1ee7bce,793dd30..55715bd --- a/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java +++ b/hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/TestOzoneConfigurationFields.java @@@ -64,11 -65,8 +66,13 @@@ public class TestOzoneConfigurationFiel ".duration"); // Deprecated config configurationPropsToSkipCompare .add(ScmConfig.ConfigStrings.HDDS_SCM_INIT_DEFAULT_LAYOUT_VERSION); + // Currently replication and type configs moved to server side. + configurationPropsToSkipCompare + .add(OzoneConfigKeys.OZONE_REPLICATION); + configurationPropsToSkipCompare + .add(OzoneConfigKeys.OZONE_REPLICATION_TYPE); + // This property is tested in TestHttpServer2 instead + xmlPropsToSkipCompare.add(HttpServer2.HTTP_IDLE_TIMEOUT_MS_KEY); addPropertiesNotInXml(); } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
