This is an automated email from the ASF dual-hosted git repository. avijayan pushed a commit to branch HDDS-3698-nonrolling-upgrade in repository https://gitbox.apache.org/repos/asf/ozone.git
commit 039afb37ce6fb4522e6d32de974c0cc96d868592 Merge: 4506883 1d8f972 Author: Aravindan Vijayan <[email protected]> AuthorDate: Thu Jul 1 23:35:16 2021 -0700 Merge remote-tracking branch 'ssh-upstream/master' into ssh-upstream-upgrade-branch .github/workflows/post-commit.yml | 8 +- .../hadoop/hdds/scm/storage/BlockOutputStream.java | 6 +- .../hadoop/hdds/fs/AbstractSpaceUsageSource.java | 7 +- .../hadoop/hdds/fs/DedicatedDiskSpaceUsage.java | 8 +- .../org/apache/hadoop/hdds/scm/ScmConfigKeys.java | 86 +---- .../apache/hadoop/hdds/scm/client/ScmClient.java | 21 + .../hadoop/hdds/scm/container/ContainerInfo.java | 5 + .../hadoop/hdds/scm/ha/SCMHAConfiguration.java | 4 +- .../protocol/StorageContainerLocationProtocol.java | 21 + .../hadoop/hdds/security/x509/crl/CRLStatus.java | 87 +++++ .../org/apache/hadoop/ozone/audit/SCMAction.java | 3 + .../common/src/main/resources/ozone-default.xml | 141 +------ .../server/ratis/ContainerStateMachine.java | 74 ++-- .../common/volume/StorageVolumeChecker.java | 20 +- .../container/stream/DirectoryServerSource.java | 6 +- .../container/stream/DirstreamClientHandler.java | 5 + .../container/stream/DirstreamServerHandler.java | 43 ++- .../ozone/container/stream/StreamingClient.java | 28 +- .../container/stream/StreamingException.java} | 27 +- .../ozone/container/stream/StreamingServer.java | 68 ++-- .../stream/TestDirstreamClientHandler.java | 139 +++++++ .../container/stream/TestStreamingServer.java | 48 +++ .../ozone/container/stream/package-info.java} | 43 +-- hadoop-hdds/docs/content/interface/O3fs.md | 8 + hadoop-hdds/docs/content/interface/O3fs.zh.md | 8 + ...inerLocationProtocolClientSideTranslatorPB.java | 73 ++++ .../SCMBlockLocationFailoverProxyProvider.java | 8 +- .../SCMSecurityProtocolFailoverProxyProvider.java | 8 +- .../scm/update/client/CRLClientUpdateHandler.java | 2 +- .../certificate/authority/CertificateStore.java | 6 + .../authority/PKIProfiles/DefaultProfile.java | 2 +- .../hadoop/hdds/security/x509/crl/CRLCodec.java | 0 .../hadoop/hdds/security/x509/crl/CRLInfo.java | 0 .../hdds/security/x509/crl/CRLInfoCodec.java | 0 .../hadoop/hdds/server/events/EventExecutor.java | 5 + .../hadoop/hdds/server/events/EventQueue.java | 44 ++- ...dExecutor.java => FixedThreadPoolExecutor.java} | 42 +- .../hdds/server/events/SingleThreadExecutor.java | 9 + .../hadoop/hdds/server/http/ProfileServlet.java | 2 +- .../apache/hadoop/hdds/utils/HddsServerUtil.java | 9 + .../x509/certificate/authority/MockCAStore.java | 12 + .../hadoop/hdds/server/events/TestEventQueue.java | 62 ++- .../src/main/proto/ScmAdminProtocol.proto | 50 ++- hadoop-hdds/interface-client/pom.xml | 13 - hadoop-hdds/interface-server/pom.xml | 2 + .../src/main/proto/SCMUpdateProtocol.proto | 0 .../hadoop/hdds/scm/SCMCommonPlacementPolicy.java | 31 +- .../block/DatanodeDeletedBlockTransactions.java | 6 - .../hdds/scm/block/DeletedBlockLogImplV2.java | 1 + .../scm/block/DeletedBlockLogStateManagerImpl.java | 26 +- .../hdds/scm/block/PendingDeleteStatusList.java | 85 ---- .../hdds/scm/block/SCMBlockDeletingService.java | 24 -- .../hdds/scm/container/ContainerReportHandler.java | 38 -- .../scm/container/ContainerStateManagerImpl.java | 2 +- .../hdds/scm/container/ReplicationManager.java | 11 +- .../scm/container/balancer/ContainerBalancer.java | 101 +++-- .../balancer/ContainerBalancerConfiguration.java | 28 ++ .../hdds/scm/crl/CRLStatusReportHandler.java | 87 +++++ .../apache/hadoop/hdds/scm/crl/package-info.java} | 43 +-- .../apache/hadoop/hdds/scm/events/SCMEvents.java | 19 +- .../hadoop/hdds/scm/ha/SCMRatisServerImpl.java | 8 +- .../apache/hadoop/hdds/scm/node/DatanodeInfo.java | 14 + .../hdds/scm/pipeline/PipelinePlacementPolicy.java | 15 +- .../scm/pipeline/PipelineStateManagerV2Impl.java | 4 +- ...inerLocationProtocolServerSideTranslatorPB.java | 70 ++++ .../scm/server/OzoneStorageContainerManager.java | 3 + .../hdds/scm/server/SCMBlockProtocolServer.java | 2 + .../hadoop/hdds/scm/server/SCMCertStore.java | 20 +- .../hdds/scm/server/SCMClientProtocolServer.java | 65 ++++ .../scm/server/SCMDatanodeHeartbeatDispatcher.java | 15 +- .../hdds/scm/server/SCMDatanodeProtocolServer.java | 3 + .../hdds/scm/server/StorageContainerManager.java | 90 +++-- .../java/org/apache/hadoop/hdds/scm/TestUtils.java | 113 ++++-- .../hadoop/hdds/scm/block/TestDeletedBlockLog.java | 96 ++++- .../hadoop/hdds/scm/container/MockNodeManager.java | 8 + .../container/balancer/TestContainerBalancer.java | 20 +- .../algorithms/TestContainerPlacementFactory.java | 11 + .../TestSCMContainerPlacementCapacity.java | 16 +- .../TestSCMContainerPlacementRackAware.java | 22 +- .../TestSCMContainerPlacementRandom.java | 19 +- .../hdds/scm/crl/TestCRLStatusReportHandler.java | 137 +++++++ .../hadoop/hdds/scm/node/TestDeadNodeHandler.java | 49 ++- .../hdds/scm/node/TestNodeReportHandler.java | 21 +- .../hadoop/hdds/scm/node/TestSCMNodeManager.java | 25 +- .../hdds/scm/node/TestSCMNodeStorageStatMap.java | 11 +- .../hadoop/hdds/scm/node/TestStatisticsUpdate.java | 14 +- .../scm/pipeline/TestPipelinePlacementPolicy.java | 33 +- .../hdds/scm/update/server/MockCRLStore.java | 6 +- .../ozone/container/common/TestEndPoint.java | 25 +- .../hdds/scm/cli/ContainerBalancerCommands.java | 108 ++++++ .../scm/cli/ContainerBalancerStartSubcommand.java | 66 ++++ .../cli/ContainerBalancerStatusSubcommand.java} | 38 +- .../scm/cli/ContainerBalancerStopSubcommand.java} | 35 +- .../hdds/scm/cli/ContainerOperationClient.java | 20 + .../datanode/TestContainerBalancerSubCommand.java | 141 +++++++ .../hadoop/ozone/client/MockOmTransport.java | 12 + .../hadoop/ozone/client/TestOzoneClient.java | 34 +- .../src/main/compose/compatibility/docker-config | 1 + .../dist/src/main/compose/ozone-csi/docker-config | 1 + .../dist/src/main/compose/ozone-ha/docker-config | 1 + .../dist/src/main/compose/ozone-mr/common-config | 1 + .../dist/src/main/compose/ozone-mr/test.sh | 6 +- .../src/main/compose/ozone-om-ha/docker-config | 1 + .../src/main/compose/ozone-topology/docker-config | 1 + .../dist/src/main/compose/ozone/docker-config | 1 + .../src/main/compose/ozones3-haproxy/docker-config | 1 + .../src/main/compose/ozonesecure-ha/docker-config | 1 + .../src/main/compose/ozonesecure-mr/docker-config | 2 + .../dist/src/main/compose/restart/docker-config | 1 + hadoop-ozone/dist/src/main/compose/test-all.sh | 8 +- .../compose/upgrade/compose/non-ha/docker-config | 3 +- hadoop-ozone/dist/src/main/compose/upgrade/test.sh | 3 + .../dist/src/main/compose/xcompat/docker-config | 1 + .../src/main/smoketest/ozonefs/hadoopo3fs.robot | 2 + .../apache/hadoop/ozone/MiniOzoneChaosCluster.java | 3 + .../ozone/TestContainerBalancerOperations.java | 112 ++++++ .../commandhandler/TestBlockDeletion.java | 47 --- .../commandhandler/TestCloseContainerHandler.java | 4 + .../commandhandler/TestDeleteContainerHandler.java | 4 + .../TestDatanodeHddsVolumeFailureDetection.java | 4 + .../TestDatanodeHddsVolumeFailureToleration.java | 4 + .../hadoop/ozone/fsck/TestContainerMapper.java | 5 + .../src/main/proto/OmClientProtocol.proto | 1 + .../org/apache/hadoop/ozone/om/KeyManagerImpl.java | 2 +- .../java/org/apache/hadoop/ozone/om/OMMetrics.java | 1 - .../apache/hadoop/ozone/om/TrashPolicyOzone.java | 20 +- .../request/bucket/OMBucketSetPropertyRequest.java | 19 +- .../bucket/TestOMBucketSetPropertyRequest.java | 10 + hadoop-ozone/ozonefs-shaded/pom.xml | 3 + .../hadoop/ozone/recon/api/NodeEndpoint.java | 17 +- .../ozone/recon/api/types/DatanodeMetadata.java | 15 + .../ozone/recon/scm/ReconContainerManager.java | 18 + .../scm/ReconStorageContainerManagerFacade.java | 6 + .../webapps/recon/ozone-recon-web/api/db.json | 20 + .../src/views/datanodes/datanodes.tsx | 11 + .../hadoop/ozone/recon/api/TestEndpoints.java | 3 +- .../ozone/recon/api/TestOpenContainerCount.java | 427 +++++++++++++++++++++ .../ozone/audit/parser/common/DatabaseHelper.java | 7 +- .../hadoop/ozone/freon/StreamingGenerator.java | 23 +- 139 files changed, 3046 insertions(+), 929 deletions(-) diff --cc hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java index 60c3931,31c4472..1701e4c --- a/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java +++ b/hadoop-hdds/common/src/main/java/org/apache/hadoop/hdds/scm/protocol/StorageContainerLocationProtocol.java @@@ -36,10 -36,9 +36,11 @@@ import java.util.EnumSet import java.util.Collections; import java.util.List; import java.util.Map; + import java.util.Optional; import java.util.Set; +import org.apache.hadoop.ozone.upgrade.UpgradeFinalizer.StatusAndMessages; + /** * ContainerLocationProtocol is used by an HDFS node to find the set of nodes * that currently host a container. diff --cc hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto index 6313579,d67dce9..3728a49 --- a/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto +++ b/hadoop-hdds/interface-admin/src/main/proto/ScmAdminProtocol.proto @@@ -69,9 -69,9 +69,11 @@@ message ScmContainerLocationRequest optional DatanodeUsageInfoRequestProto DatanodeUsageInfoRequest = 30; optional GetExistContainerWithPipelinesInBatchRequestProto getExistContainerWithPipelinesInBatchRequest = 31; optional GetContainerTokenRequestProto containerTokenRequest = 32; - optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 33; - optional QueryUpgradeFinalizationProgressRequestProto - queryUpgradeFinalizationProgressRequest = 34; + optional StartContainerBalancerRequestProto startContainerBalancerRequest = 33; + optional StopContainerBalancerRequestProto stopContainerBalancerRequest = 34; + optional ContainerBalancerStatusRequestProto containerBalancerStatusRequest = 35; ++ optional FinalizeScmUpgradeRequestProto finalizeScmUpgradeRequest = 36; ++ optional QueryUpgradeFinalizationProgressRequestProto queryUpgradeFinalizationProgressRequest = 37; } message ScmContainerLocationResponse { @@@ -112,9 -112,9 +114,11 @@@ optional DatanodeUsageInfoResponseProto DatanodeUsageInfoResponse = 30; optional GetExistContainerWithPipelinesInBatchResponseProto getExistContainerWithPipelinesInBatchResponse = 31; optional GetContainerTokenResponseProto containerTokenResponse = 32; - optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 33; - optional QueryUpgradeFinalizationProgressResponseProto - queryUpgradeFinalizationProgressResponse = 34; + optional StartContainerBalancerResponseProto startContainerBalancerResponse = 33; + optional StopContainerBalancerResponseProto stopContainerBalancerResponse = 34; + optional ContainerBalancerStatusResponseProto containerBalancerStatusResponse = 35; ++ optional FinalizeScmUpgradeResponseProto finalizeScmUpgradeResponse = 36; ++ optional QueryUpgradeFinalizationProgressResponseProto queryUpgradeFinalizationProgressResponse = 37; enum Status { OK = 1; @@@ -153,8 -153,9 +157,11 @@@ enum Type DatanodeUsageInfo = 25; GetExistContainerWithPipelinesInBatch = 26; GetContainerToken = 27; - FinalizeScmUpgrade = 28; - QueryUpgradeFinalizationProgress = 29; + StartContainerBalancer = 28; + StopContainerBalancer = 29; + GetContainerBalancerStatus = 30; ++ FinalizeScmUpgrade = 31; ++ QueryUpgradeFinalizationProgress = 32; } /** diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java index 6f9030c,a374766..d9872ce --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMClientProtocolServer.java @@@ -671,37 -675,64 +677,96 @@@ public class SCMClientProtocolServer im } @Override + public StatusAndMessages finalizeScmUpgrade(String upgradeClientID) throws + IOException { + // check admin authorization + try { + getScm().checkAdminAccess(getRemoteUser()); + } catch (IOException e) { + LOG.error("Authorization failed for finalize scm upgrade", e); + throw e; + } + return scm.finalizeUpgrade(upgradeClientID); + } + + @Override + public StatusAndMessages queryUpgradeFinalizationProgress( + String upgradeClientID, boolean force, boolean readonly) + throws IOException { + if (!readonly) { + // check admin authorization + try { + getScm().checkAdminAccess(getRemoteUser()); + } catch (IOException e) { + LOG.error("Authorization failed for query scm upgrade finalization " + + "progress", e); + throw e; + } + } + + return scm.queryUpgradeFinalizationProgress(upgradeClientID, force, + readonly); + } + ++ @Override + public boolean startContainerBalancer(Optional<Double> threshold, + Optional<Integer> idleiterations, + Optional<Integer> maxDatanodesToBalance, + Optional<Long> maxSizeToMoveInGB) throws IOException{ + getScm().checkAdminAccess(getRemoteUser()); + ContainerBalancerConfiguration cbc = new ContainerBalancerConfiguration(); + if (threshold.isPresent()) { + double tsd = threshold.get(); + Preconditions.checkState(tsd >= 0.0D && tsd < 1.0D, + "threshold should to be specified in range [0.0, 1.0)."); + cbc.setThreshold(tsd); + } + if (maxSizeToMoveInGB.isPresent()) { + long mstm = maxSizeToMoveInGB.get(); + Preconditions.checkState(mstm > 0, + "maxSizeToMoveInGB must be positive."); + cbc.setMaxSizeToMove(mstm * OzoneConsts.GB); + } + if (maxDatanodesToBalance.isPresent()) { + int mdtb = maxDatanodesToBalance.get(); + Preconditions.checkState(mdtb > 0, + "maxDatanodesToBalance must be positive."); + cbc.setMaxDatanodesToBalance(mdtb); + } + if (idleiterations.isPresent()) { + int idi = idleiterations.get(); + Preconditions.checkState(idi > 0 || idi == -1, + "idleiterations must be positive or" + + " -1(infinitly run container balancer)."); + cbc.setIdleIteration(idi); + } + + boolean isStartedSuccessfully = scm.getContainerBalancer().start(cbc); + if (isStartedSuccessfully) { + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.START_CONTAINER_BALANCER, null)); + } else { + AUDIT.logWriteFailure(buildAuditMessageForSuccess( + SCMAction.START_CONTAINER_BALANCER, null)); + } + return isStartedSuccessfully; + } + + @Override + public void stopContainerBalancer() throws IOException { + getScm().checkAdminAccess(getRemoteUser()); + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.STOP_CONTAINER_BALANCER, null)); + scm.getContainerBalancer().stop(); + } + + @Override + public boolean getContainerBalancerStatus() { + AUDIT.logWriteSuccess(buildAuditMessageForSuccess( + SCMAction.GET_CONTAINER_BALANCER_STATUS, null)); + return scm.getContainerBalancer().isBalancerRunning(); + } + /** * Get Datanode usage info such as capacity, SCMUsed, and remaining by ip * or uuid. diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java index 3439933,5a69121..745be9d --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/SCMDatanodeHeartbeatDispatcher.java @@@ -20,8 -20,9 +20,10 @@@ package org.apache.hadoop.hdds.scm.serv import com.google.common.base.Preconditions; import org.apache.hadoop.hdds.protocol.DatanodeDetails; import org.apache.hadoop.hdds.protocol.proto + .StorageContainerDatanodeProtocolProtos.CRLStatusReport; + import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.IncrementalContainerReportProto; +import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.LayoutVersionProto; import org.apache.hadoop.hdds.protocol.proto .StorageContainerDatanodeProtocolProtos.PipelineReportsProto; import org.apache.hadoop.hdds.protocol.proto diff --cc hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java index 0f8ea75,b75112c..109e5e4 --- a/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java +++ b/hadoop-hdds/server-scm/src/main/java/org/apache/hadoop/hdds/scm/server/StorageContainerManager.java @@@ -375,19 -390,13 +404,17 @@@ public final class StorageContainerMana pipelineManager, containerManager); StartDatanodeAdminHandler datanodeStartAdminHandler = new StartDatanodeAdminHandler(scmNodeManager, pipelineManager); - NonHealthyToHealthyNodeHandler nonHealthyToHealthyNodeHandler = - new NonHealthyToHealthyNodeHandler(configuration, serviceManager); + ReadOnlyHealthyToHealthyNodeHandler readOnlyHealthyToHealthyNodeHandler = - new ReadOnlyHealthyToHealthyNodeHandler(conf, serviceManager); ++ new ReadOnlyHealthyToHealthyNodeHandler(configuration, serviceManager); + HealthyReadOnlyNodeHandler + healthyReadOnlyNodeHandler = + new HealthyReadOnlyNodeHandler(scmNodeManager, - pipelineManager, conf); ++ pipelineManager, configuration); ContainerActionsHandler actionsHandler = new ContainerActionsHandler(); - PendingDeleteHandler pendingDeleteHandler = - new PendingDeleteHandler(scmBlockManager.getSCMBlockDeletingService()); ContainerReportHandler containerReportHandler = - new ContainerReportHandler( - scmNodeManager, containerManager, scmContext, conf); + new ContainerReportHandler(scmNodeManager, containerManager, + scmContext, configuration); IncrementalContainerReportHandler incrementalContainerReportHandler = new IncrementalContainerReportHandler( diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java index af0538a,4d364ef..db145c7 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/TestUtils.java @@@ -226,16 -232,23 +232,32 @@@ public final class TestUtils StorageTypeProto.DISK); } + /** + * Generates random metadata storage report. + * + * @param path path of the storage + * + * @return MetadataStorageReportProto + */ + public static MetadataStorageReportProto getRandomMetadataStorageReport( + String path) { + return createMetadataStorageReport(path, + random.nextInt(1000), + random.nextInt(500), + random.nextInt(500), + StorageTypeProto.DISK); + } + public static StorageReportProto createStorageReport(UUID nodeId, String path, + long capacity) { + return createStorageReport(nodeId, path, + capacity, + 0, + capacity, + StorageTypeProto.DISK); + } + + public static StorageReportProto createStorageReport(UUID nodeId, String path, long capacity, long used, long remaining, StorageTypeProto type) { return createStorageReport(nodeId, path, capacity, used, remaining, type, false); diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java index 33272bd,f1ff0d9..411e59f --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/container/placement/algorithms/TestSCMContainerPlacementCapacity.java @@@ -34,9 -35,10 +35,11 @@@ import org.apache.hadoop.hdds.scm.node. import org.apache.hadoop.hdds.scm.node.NodeManager; import org.apache.hadoop.hdds.scm.node.NodeStatus; +import org.apache.hadoop.ozone.container.upgrade.UpgradeUtils; import org.junit.Assert; import org.junit.Test; + + import static org.apache.hadoop.hdds.scm.ScmConfigKeys.OZONE_DATANODE_RATIS_VOLUME_FREE_SPACE_MIN; import static org.mockito.Matchers.anyObject; import org.mockito.Mockito; import static org.mockito.Mockito.when; diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java index 934d986,d91f733..436a1e8 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestNodeReportHandler.java @@@ -16,9 -16,9 +16,11 @@@ */ package org.apache.hadoop.hdds.scm.node; +import static org.apache.hadoop.hdds.upgrade.HDDSLayoutVersionManager.maxLayoutVersion; + import java.io.IOException; + import java.util.Arrays; + import java.util.List; import java.util.UUID; import org.apache.hadoop.hdds.conf.OzoneConfiguration; import org.apache.hadoop.hdds.protocol.DatanodeDetails; @@@ -52,10 -52,11 +55,12 @@@ public class TestNodeReportHandler impl private static final Logger LOG = LoggerFactory .getLogger(TestNodeReportHandler.class); private NodeReportHandler nodeReportHandler; + private HDDSLayoutVersionManager versionManager; private SCMNodeManager nodeManager; private String storagePath = GenericTestUtils.getRandomizedTempPath() - .concat("/" + UUID.randomUUID().toString()); + .concat("/data-" + UUID.randomUUID().toString()); + private String metaStoragePath = GenericTestUtils.getRandomizedTempPath() + .concat("/metadata-" + UUID.randomUUID().toString()); @Before public void resetEventCollector() throws IOException { diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java index 80886cd,17c6a0f..4554b53 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/hdds/scm/node/TestSCMNodeManager.java @@@ -231,241 -176,6 +231,242 @@@ public class TestSCMNodeManager } /** + * Tests that node manager handles layout version changes from heartbeats + * correctly. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmLayoutOnHeartbeat() throws Exception { + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, + 1, TimeUnit.DAYS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + Assert.assertTrue(scm.checkLeader()); + // Register 2 nodes correctly. + // These will be used with a faulty node to test pipeline creation. + DatanodeDetails goodNode1 = registerWithCapacity(nodeManager); + DatanodeDetails goodNode2 = registerWithCapacity(nodeManager); + + scm.exitSafeMode(); + + assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, + nodeManager, SMALLER_MLV_LAYOUT_PROTO); + assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, + nodeManager, LARGER_MLV_SLV_LAYOUT_PROTO); + assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, + nodeManager, SMALLER_MLV_SLV_LAYOUT_PROTO); + assertPipelineClosedAfterLayoutHeartbeat(goodNode1, goodNode2, + nodeManager, LARGER_SLV_LAYOUT_PROTO); + } + } + + /** + * Create {@link DatanodeDetails} to register with {@code nodeManager}, and + * provide the datanode maximum capacity so that space used does not block + * pipeline creation. + * @return The created {@link DatanodeDetails}. + */ + private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager) { + return registerWithCapacity(nodeManager, + UpgradeUtils.defaultLayoutVersionProto(), success); + } + + /** + * Create {@link DatanodeDetails} to register with {@code nodeManager}, and + * provide the datanode maximum capacity so that space used does not block + * pipeline creation. Also check that the result of registering matched + * {@code expectedResult}. + * @return The created {@link DatanodeDetails}. + */ + private DatanodeDetails registerWithCapacity(SCMNodeManager nodeManager, + LayoutVersionProto layout, ErrorCode expectedResult) { + DatanodeDetails details = MockDatanodeDetails.randomDatanodeDetails(); + StorageReportProto storageReport = + TestUtils.createStorageReport(details.getUuid(), + details.getNetworkFullPath(), Long.MAX_VALUE); + RegisteredCommand cmd = nodeManager.register( + MockDatanodeDetails.randomDatanodeDetails(), - TestUtils.createNodeReport(storageReport), ++ TestUtils.createNodeReport(Arrays.asList(storageReport), ++ Collections.emptyList()), + getRandomPipelineReports(), layout); + + Assert.assertEquals(expectedResult, cmd.getError()); + return cmd.getDatanode(); + } + + private void assertPipelineClosedAfterLayoutHeartbeat( + DatanodeDetails originalNode1, DatanodeDetails originalNode2, + SCMNodeManager nodeManager, LayoutVersionProto layout) throws Exception { + + List<DatanodeDetails> originalNodes = + Arrays.asList(originalNode1, originalNode2); + + // Initial condition: 2 healthy nodes registered. + assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2, + originalNodes); + assertPipelines(HddsProtos.ReplicationFactor.THREE, + count -> count == 0, new ArrayList<>()); + + // Even when safemode exit or new node addition trigger pipeline + // creation, they will fail with not enough healthy nodes for ratis 3 + // pipeline. Therefore we do not have to worry about this create call + // failing due to datanodes reaching their maximum pipeline limit. + assertPipelineCreationFailsWithNotEnoughNodes(2); + + // Register a new node correctly. + DatanodeDetails node = registerWithCapacity(nodeManager); + + List<DatanodeDetails> allNodes = new ArrayList<>(originalNodes); + allNodes.add(node); + + // Safemode exit and adding the new node should trigger pipeline creation. + assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 3, + allNodes); + assertPipelines(HddsProtos.ReplicationFactor.THREE, count -> count >= 1, + allNodes); + + // node sends incorrect layout. + nodeManager.processHeartbeat(node, layout); + + // Its pipelines should be closed then removed, meaning there is not + // enough nodes for factor 3 pipelines. + assertPipelines(HddsProtos.ReplicationFactor.ONE, count -> count == 2, + originalNodes); + assertPipelines(HddsProtos.ReplicationFactor.THREE, + count -> count == 0, new ArrayList<>()); + + assertPipelineCreationFailsWithNotEnoughNodes(2); + } + + /** + * Tests that node manager handles layout versions for newly registered nodes + * correctly. + * + * @throws IOException + * @throws InterruptedException + * @throws TimeoutException + */ + @Test + public void testScmLayoutOnRegister() + throws Exception { + + OzoneConfiguration conf = getConf(); + conf.setTimeDuration(ScmConfigKeys.OZONE_SCM_PIPELINE_CREATION_INTERVAL, + 1, TimeUnit.DAYS); + + try (SCMNodeManager nodeManager = createNodeManager(conf)) { + Assert.assertTrue(scm.checkLeader()); + // Nodes with mismatched SLV cannot join the cluster. + registerWithCapacity(nodeManager, + LARGER_SLV_LAYOUT_PROTO, errorNodeNotPermitted); + registerWithCapacity(nodeManager, + SMALLER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted); + registerWithCapacity(nodeManager, + LARGER_MLV_SLV_LAYOUT_PROTO, errorNodeNotPermitted); + // Nodes with mismatched MLV can join, but should not be allowed in + // pipelines. + DatanodeDetails badMlvNode1 = registerWithCapacity(nodeManager, + SMALLER_MLV_LAYOUT_PROTO, success); + DatanodeDetails badMlvNode2 = registerWithCapacity(nodeManager, + SMALLER_MLV_LAYOUT_PROTO, success); + // This node has correct MLV and SLV, so it can join and be used in + // pipelines. + DatanodeDetails goodNode = registerWithCapacity(nodeManager, + CORRECT_LAYOUT_PROTO, success); + + Assert.assertEquals(3, nodeManager.getAllNodes().size()); + + scm.exitSafeMode(); + + // SCM should auto create a factor 1 pipeline for the one healthy node. + // Still should not have enough healthy nodes for ratis 3 pipeline. + assertPipelines(HddsProtos.ReplicationFactor.ONE, + count -> count == 1, + Collections.singletonList(goodNode)); + assertPipelines(HddsProtos.ReplicationFactor.THREE, + count -> count == 0, + new ArrayList<>()); + + // Even when safemode exit or new node addition trigger pipeline + // creation, they will fail with not enough healthy nodes for ratis 3 + // pipeline. Therefore we do not have to worry about this create call + // failing due to datanodes reaching their maximum pipeline limit. + assertPipelineCreationFailsWithNotEnoughNodes(1); + + // Heartbeat bad MLV nodes back to healthy. + nodeManager.processHeartbeat(badMlvNode1, CORRECT_LAYOUT_PROTO); + nodeManager.processHeartbeat(badMlvNode2, CORRECT_LAYOUT_PROTO); + + // After moving out of healthy readonly, pipeline creation should be + // triggered. + assertPipelines(HddsProtos.ReplicationFactor.ONE, + count -> count == 3, + Arrays.asList(badMlvNode1, badMlvNode2, goodNode)); + assertPipelines(HddsProtos.ReplicationFactor.THREE, + count -> count >= 1, + Arrays.asList(badMlvNode1, badMlvNode2, goodNode)); + } + } + + private void assertPipelineCreationFailsWithNotEnoughNodes( + int actualNodeCount) throws Exception { + try { + ReplicationConfig ratisThree = + ReplicationConfig.fromTypeAndFactor(HddsProtos.ReplicationType.RATIS, + HddsProtos.ReplicationFactor.THREE); + scm.getPipelineManager().createPipeline(ratisThree); + Assert.fail("3 nodes should not have been found for a pipeline."); + } catch (SCMException ex) { + Assert.assertTrue(ex.getMessage().contains("Required 3. Found " + + actualNodeCount)); + } + } + + private void assertPipelines(HddsProtos.ReplicationFactor factor, + Predicate<Integer> countCheck, Collection<DatanodeDetails> allowedDNs) + throws Exception { + + Set<String> allowedDnIds = allowedDNs.stream() + .map(DatanodeDetails::getUuidString) + .collect(Collectors.toSet()); + + RatisReplicationConfig replConfig = new RatisReplicationConfig(factor); + + // Wait for the expected number of pipelines using allowed DNs. + GenericTestUtils.waitFor(() -> { + List<Pipeline> pipelines = scm.getPipelineManager() + .getPipelines(replConfig); + LOG.info("Found {} pipelines of type {} and factor {}.", pipelines.size(), + replConfig.getReplicationType(), replConfig.getReplicationFactor()); + boolean success = countCheck.test(pipelines.size()); + + // If we have the correct number of pipelines, make sure that none of + // these pipelines use nodes outside of allowedDNs. + if (success) { + for (Pipeline pipeline: pipelines) { + for(DatanodeDetails pipelineDN: pipeline.getNodes()) { + // Do not wait for this condition to be true. Disallowed DNs should + // never be used once we have the expected number of pipelines. + if (!allowedDnIds.contains(pipelineDN.getUuidString())) { + String message = String.format("Pipeline %s used datanode %s " + + "which is not in the set of allowed datanodes: %s", + pipeline.getId().toString(), pipelineDN.getUuidString(), + allowedDnIds.toString()); + Assert.fail(message); + } + } + } + } + + return success; + }, 1000, 10000); + } + + /** * asserts that if we send no heartbeats node manager stays in safemode. * * @throws IOException @@@ -1338,8 -937,9 +1339,9 @@@ String storagePath = testDir.getAbsolutePath() + "/" + dnId; StorageReportProto report = TestUtils .createStorageReport(dnId, storagePath, capacity, used, free, null); - nodeManager.register(dn, TestUtils.createNodeReport(report), null); + nodeManager.register(dn, TestUtils.createNodeReport( + Arrays.asList(report), Collections.emptyList()), null); - nodeManager.processHeartbeat(dn); + nodeManager.processHeartbeat(dn, layoutInfo); } //TODO: wait for EventQueue to be processed eventQueue.processAll(8000L); @@@ -1386,17 -986,13 +1388,18 @@@ for (int x = 0; x < volumeCount; x++) { String storagePath = testDir.getAbsolutePath() + "/" + dnId; reports.add(TestUtils - .createStorageReport(dnId, storagePath, capacity, - used, free, null, failed)); + .createStorageReport(dnId, storagePath, capacity, + used, free, null, failed)); failed = !failed; } - nodeManager.register(dn, TestUtils.createNodeReport(reports), null); + nodeManager.register(dn, TestUtils.createNodeReport(reports, + Collections.emptyList()), null); - nodeManager.processHeartbeat(dn); + LayoutVersionManager versionManager = + nodeManager.getLayoutVersionManager(); + LayoutVersionProto layoutInfo = toLayoutVersionProto( + versionManager.getMetadataLayoutVersion(), + versionManager.getSoftwareLayoutVersion()); + nodeManager.processHeartbeat(dn, layoutInfo); //TODO: wait for EventQueue to be processed eventQueue.processAll(8000L); @@@ -1446,16 -1042,12 +1449,17 @@@ StorageReportProto report = TestUtils .createStorageReport(dnId, storagePath, capacity, scmUsed, remaining, null); - NodeReportProto nodeReportProto = TestUtils.createNodeReport(report); + NodeReportProto nodeReportProto = TestUtils.createNodeReport( + Arrays.asList(report), Collections.emptyList()); nodeReportHandler.onMessage( - new NodeReportFromDatanode(datanodeDetails, nodeReportProto), - publisher); - nodeManager.processHeartbeat(datanodeDetails); + new NodeReportFromDatanode(datanodeDetails, nodeReportProto), + publisher); + LayoutVersionManager versionManager = + nodeManager.getLayoutVersionManager(); + LayoutVersionProto layoutInfo = toLayoutVersionProto( + versionManager.getMetadataLayoutVersion(), + versionManager.getSoftwareLayoutVersion()); + nodeManager.processHeartbeat(datanodeDetails, layoutInfo); Thread.sleep(100); } @@@ -1759,15 -1342,11 +1765,20 @@@ .createStorageReport(dnId, storagePath, capacity, used, remaining, null); + nodeManager.register(datanodeDetails, TestUtils.createNodeReport( + Arrays.asList(report), Collections.emptyList()), + TestUtils.getRandomPipelineReports()); - nodeManager.processHeartbeat(datanodeDetails); + LayoutVersionManager versionManager = + nodeManager.getLayoutVersionManager(); + LayoutVersionProto layoutInfo = toLayoutVersionProto( + versionManager.getMetadataLayoutVersion(), + versionManager.getSoftwareLayoutVersion()); - nodeManager.register(datanodeDetails, TestUtils.createNodeReport(report), ++ nodeManager.register(datanodeDetails, ++ TestUtils.createNodeReport(Arrays.asList(report), ++ Collections.emptyList()), + TestUtils.getRandomPipelineReports(), layoutInfo); + nodeManager.processHeartbeat(datanodeDetails, layoutInfo); if (i == 5) { nodeManager.setNodeOperationalState(datanodeDetails, HddsProtos.NodeOperationalState.ENTERING_MAINTENANCE); diff --cc hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java index 1dcefa4,4a91502..85643d8 --- a/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java +++ b/hadoop-hdds/server-scm/src/test/java/org/apache/hadoop/ozone/container/common/TestEndPoint.java @@@ -275,10 -274,12 +277,13 @@@ public class TestEndPoint SCMRegisteredResponseProto responseProto = rpcEndPoint.getEndPoint() .register(nodeToRegister.getExtendedProtoBufMessage(), TestUtils .createNodeReport( - getStorageReports(nodeToRegister.getUuid())), + Arrays.asList(getStorageReports( + nodeToRegister.getUuid())), + Arrays.asList(getMetadataStorageReports( + nodeToRegister.getUuid()))), TestUtils.getRandomContainerReports(10), - TestUtils.getRandomPipelineReports()); + TestUtils.getRandomPipelineReports(), + defaultLayoutVersionProto()); Assert.assertNotNull(responseProto); Assert.assertEquals(nodeToRegister.getUuidString(), responseProto.getDatanodeUUID()); @@@ -294,7 -295,14 +299,13 @@@ return TestUtils.createStorageReport(id, storagePath, 100, 10, 90, null); } + private MetadataStorageReportProto getMetadataStorageReports(UUID id) { + String storagePath = testDir.getAbsolutePath() + "/metadata-" + id; + return TestUtils.createMetadataStorageReport(storagePath, 100, 10, 90, + null); + } + - private EndpointStateMachine registerTaskHelper( - InetSocketAddress scmAddress, + private EndpointStateMachine registerTaskHelper(InetSocketAddress scmAddress, int rpcTimeout, boolean clearDatanodeDetails ) throws Exception { OzoneConfiguration conf = SCMTestUtils.getConf(); diff --cc hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config index a6d4482,69e7b91..1a7419c --- a/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config +++ b/hadoop-ozone/dist/src/main/compose/upgrade/compose/non-ha/docker-config @@@ -14,13 -14,12 +14,14 @@@ # See the License for the specific language governing permissions and # limitations under the License. -CORE-SITE.XML_fs.ofs.impl=org.apache.hadoop.fs.ozone.RootedOzoneFileSystem -CORE-SITE.XML_fs.o3fs.impl=org.apache.hadoop.fs.ozone.OzoneFileSystem +OZONE-SITE.XML_ozone.metadata.dirs=/data/metadata + +OZONE-SITE.XML_ozone.client.failover.max.attempts=6 + OZONE-SITE.XML_ozone.om.address=om OZONE-SITE.XML_ozone.om.http-address=om:9874 - + OZONE-SITE.XML_ozone.scm.container.size=1GB + OZONE-SITE.XML_ozone.scm.datanode.ratis.volume.free-space.min=10MB OZONE-SITE.XML_ozone.scm.pipeline.creation.interval=30s OZONE-SITE.XML_ozone.scm.pipeline.owner.container.count=1 OZONE-SITE.XML_ozone.scm.names=scm diff --cc hadoop-ozone/dist/src/main/compose/upgrade/test.sh index 2857c1a,0aa1419..a323310 --- a/hadoop-ozone/dist/src/main/compose/upgrade/test.sh +++ b/hadoop-ozone/dist/src/main/compose/upgrade/test.sh @@@ -15,27 -15,18 +15,30 @@@ # See the License for the specific language governing permissions and # limitations under the License. -SCRIPT_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd ) -ALL_RESULT_DIR="$SCRIPT_DIR/result" -mkdir -p "$ALL_RESULT_DIR" -rm "$ALL_RESULT_DIR"/* || true -source "$SCRIPT_DIR/../testlib.sh" +# Version that will be run using the local build. +: "${OZONE_CURRENT_VERSION:=1.1.0}" +export OZONE_CURRENT_VERSION -tests=$(find_tests) -cd "$SCRIPT_DIR" +TEST_DIR=$( cd "$( dirname "${BASH_SOURCE[0]}" )" >/dev/null && pwd ) +source "$TEST_DIR/testlib.sh" + +# Export variables needed by tests and ../testlib.sh. +export TEST_DIR +export COMPOSE_DIR="$TEST_DIR" + RESULT=0 + run_test_scripts ${tests} || RESULT=$? + -generate_report "upgrade" "${ALL_RESULT_DIR}" +RESULT_DIR="$ALL_RESULT_DIR" create_results_dir + +# Upgrade tests to be run. +# Run all upgrades even if one fails. +# Any failure will save a failing return code to $RESULT. +set +e +run_test manual-upgrade 0.5.0 1.1.0 +run_test non-rolling-upgrade 1.0.0 1.1.0 +set -e + +generate_report "upgrade" "$ALL_RESULT_DIR" -exit ${RESULT} +exit "$RESULT" diff --cc hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java index 0000000,2647c9a..0e66553 mode 000000,100644..100644 --- a/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java +++ b/hadoop-ozone/recon/src/test/java/org/apache/hadoop/ozone/recon/api/TestOpenContainerCount.java @@@ -1,0 -1,424 +1,427 @@@ + /* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * <p> + * http://www.apache.org/licenses/LICENSE-2.0 + * <p> + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.hadoop.ozone.recon.api; + + import org.apache.hadoop.ozone.OzoneConsts; + import org.apache.hadoop.hdds.client.RatisReplicationConfig; + import org.apache.hadoop.hdds.protocol.DatanodeDetails; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos + .ExtendedDatanodeDetailsProto; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos.LifeCycleState; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos.PipelineID; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos.ReplicationFactor; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReplicaProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReport; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.SCMHeartbeatRequestProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageTypeProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto.Builder; + import org.apache.hadoop.hdds.protocol.proto.HddsProtos.DatanodeDetailsProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.StorageReportProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.NodeReportProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.ContainerReportsProto; + import org.apache.hadoop.hdds.protocol.proto.StorageContainerDatanodeProtocolProtos.PipelineReportsProto; + import org.apache.hadoop.hdds.scm.container.ContainerInfo; + import org.apache.hadoop.hdds.scm.container.common.helpers.ContainerWithPipeline; + import org.apache.hadoop.hdds.scm.pipeline.Pipeline; + import org.apache.hadoop.hdds.scm.protocol.StorageContainerLocationProtocol; + import org.apache.hadoop.hdds.scm.server.OzoneStorageContainerManager; + import org.apache.hadoop.hdfs.web.URLConnectionFactory; + import org.apache.hadoop.ozone.recon.MetricsServiceProviderFactory; + import org.apache.hadoop.ozone.recon.ReconTestInjector; + import org.apache.hadoop.ozone.recon.ReconUtils; + import org.apache.hadoop.ozone.recon.api.types.DatanodeMetadata; + import org.apache.hadoop.ozone.recon.api.types.DatanodesResponse; + import org.apache.hadoop.ozone.recon.persistence.ContainerHealthSchemaManager; + import org.apache.hadoop.ozone.recon.recovery.ReconOMMetadataManager; + import org.apache.hadoop.ozone.recon.scm.ReconStorageContainerManagerFacade; + import org.apache.hadoop.ozone.recon.spi.StorageContainerServiceProvider; + import org.apache.hadoop.ozone.recon.spi.impl.OzoneManagerServiceProviderImpl; + import org.apache.hadoop.ozone.recon.spi.impl.StorageContainerServiceProviderImpl; + import org.apache.ozone.test.LambdaTestUtils; + import org.junit.Assert; + import org.junit.Before; + import org.junit.Rule; + import org.junit.Test; + import org.junit.rules.TemporaryFolder; + + import static org.apache.hadoop.hdds.protocol.MockDatanodeDetails.randomDatanodeDetails; ++import static org.apache.hadoop.ozone.container.upgrade.UpgradeUtils.defaultLayoutVersionProto; + import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getRandomPipeline; + import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.getTestReconOmMetadataManager; + import static org.apache.hadoop.ozone.recon.OMMetadataManagerTestUtils.initializeNewOmMetadataManager; + import static org.mockito.ArgumentMatchers.any; + import static org.mockito.ArgumentMatchers.anyBoolean; + import static org.mockito.ArgumentMatchers.anyString; + import static org.mockito.Mockito.mock; + import static org.mockito.Mockito.when; + + import javax.servlet.http.HttpServletResponse; + import javax.ws.rs.core.Response; + + import java.io.IOException; + import java.net.HttpURLConnection; + import java.util.*; + import java.util.concurrent.Callable; + + /** + * Test for Open Container count per Datanode. + */ + public class TestOpenContainerCount { + @Rule + public TemporaryFolder temporaryFolder = new TemporaryFolder(); + + private NodeEndpoint nodeEndpoint; + private ReconOMMetadataManager reconOMMetadataManager; + private ReconStorageContainerManagerFacade reconScm; + private boolean isSetupDone = false; + private String pipelineId; + private String pipelineId2; + private DatanodeDetails datanodeDetails; + private String datanodeId; + private ContainerReportsProto containerReportsProto; + private Builder builder; + private ExtendedDatanodeDetailsProto extendedDatanodeDetailsProto; + private NodeReportProto nodeReportProto; + private PipelineReportsProto pipelineReportsProto; + private Pipeline pipeline; + private Pipeline pipeline2; + private static final String HOST1 = "host1.datanode"; + private static final String IP1 = "1.1.1.1"; + private ReconUtils reconUtilsMock; + private StorageContainerServiceProvider mockScmServiceProvider; + + private List<Long> containerIDs; + + private List<ContainerWithPipeline> cpw; + + private void initializeInjector() throws Exception { + reconOMMetadataManager = getTestReconOmMetadataManager( + initializeNewOmMetadataManager(temporaryFolder.newFolder()), + temporaryFolder.newFolder()); + datanodeDetails = randomDatanodeDetails(); + datanodeDetails.setHostName(HOST1); + datanodeDetails.setIpAddress(IP1); + pipeline = getRandomPipeline(datanodeDetails); + pipelineId = pipeline.getId().getId().toString(); + + pipeline2 = getRandomPipeline(datanodeDetails); + pipelineId2 = pipeline2.getId().getId().toString(); + + StorageContainerLocationProtocol mockScmClient = mock( + StorageContainerLocationProtocol.class); + mockScmServiceProvider = mock( + StorageContainerServiceProviderImpl.class); + + when(mockScmServiceProvider.getPipeline( + pipeline.getId().getProtobuf())).thenReturn(pipeline); + when(mockScmServiceProvider.getPipeline( + pipeline2.getId().getProtobuf())).thenReturn(pipeline2); + + // Open 5 containers on pipeline 1 + containerIDs = new LinkedList<>(); + cpw = new LinkedList<>(); + for (long i = 1L; i <= 5L; ++i) { + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(i) + .setReplicationConfig( + new RatisReplicationConfig(ReplicationFactor.ONE)) + .setState(LifeCycleState.OPEN) + .setOwner("test") + .setPipelineID(pipeline.getId()) + .build(); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(containerInfo, pipeline); + when(mockScmServiceProvider.getContainerWithPipeline(i)) + .thenReturn(containerWithPipeline); + containerIDs.add(i); + cpw.add(containerWithPipeline); + } + + // Open 5 containers on pipeline 2 + for (long i = 6L; i <= 10L; ++i) { + ContainerInfo containerInfo = new ContainerInfo.Builder() + .setContainerID(i) + .setReplicationConfig( + new RatisReplicationConfig(ReplicationFactor.ONE)) + .setState(LifeCycleState.OPEN) + .setOwner("test") + .setPipelineID(pipeline2.getId()) + .build(); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(containerInfo, pipeline2); + when(mockScmServiceProvider.getContainerWithPipeline(i)) + .thenReturn(containerWithPipeline); + containerIDs.add(i); + cpw.add(containerWithPipeline); + } + + when(mockScmServiceProvider + .getExistContainerWithPipelinesInBatch(containerIDs)) + .thenReturn(cpw); + + reconUtilsMock = mock(ReconUtils.class); + HttpURLConnection urlConnectionMock = mock(HttpURLConnection.class); + when(urlConnectionMock.getResponseCode()) + .thenReturn(HttpServletResponse.SC_OK); + when(reconUtilsMock.makeHttpCall(any(URLConnectionFactory.class), + anyString(), anyBoolean())).thenReturn(urlConnectionMock); + + ReconTestInjector reconTestInjector = + new ReconTestInjector.Builder(temporaryFolder) + .withReconSqlDb() + .withReconOm(reconOMMetadataManager) + .withOmServiceProvider( + mock(OzoneManagerServiceProviderImpl.class)) + .addBinding(StorageContainerServiceProvider.class, + mockScmServiceProvider) + .addBinding(OzoneStorageContainerManager.class, + ReconStorageContainerManagerFacade.class) + .withContainerDB() + .addBinding(NodeEndpoint.class) + .addBinding(MetricsServiceProviderFactory.class) + .addBinding(ContainerHealthSchemaManager.class) + .addBinding(ReconUtils.class, reconUtilsMock) + .addBinding(StorageContainerLocationProtocol.class, + mockScmClient) + .build(); + + nodeEndpoint = reconTestInjector.getInstance(NodeEndpoint.class); + reconScm = (ReconStorageContainerManagerFacade) + reconTestInjector.getInstance(OzoneStorageContainerManager.class); + } + + @Before + public void setUp() throws Exception { + // The following setup runs only once + if (!isSetupDone) { + initializeInjector(); + isSetupDone = true; + } + datanodeId = datanodeDetails.getUuid().toString(); + + // initialize container report + builder = ContainerReportsProto.newBuilder(); + for (long i = 1L; i <= 10L; i++) { + builder.addReports( + ContainerReplicaProto.newBuilder() + .setContainerID(i) + .setState(ContainerReplicaProto.State.OPEN) + .setOriginNodeId(datanodeId) + .build() + ); + } + containerReportsProto = builder.build(); + + UUID pipelineUuid = UUID.fromString(pipelineId); + HddsProtos.UUID uuid128 = HddsProtos.UUID.newBuilder() + .setMostSigBits(pipelineUuid.getMostSignificantBits()) + .setLeastSigBits(pipelineUuid.getLeastSignificantBits()) + .build(); + + UUID pipelineUuid2 = UUID.fromString(pipelineId2); + HddsProtos.UUID uuid1282 = HddsProtos.UUID.newBuilder() + .setMostSigBits(pipelineUuid2.getMostSignificantBits()) + .setLeastSigBits(pipelineUuid2.getLeastSignificantBits()) + .build(); + + PipelineReport pipelineReport = PipelineReport.newBuilder() + .setPipelineID( + PipelineID.newBuilder() + .setId(pipelineId) + .setUuid128(uuid128) + .build()) + .setIsLeader(true) + .build(); + + PipelineReport pipelineReport2 = PipelineReport.newBuilder() + .setPipelineID( + PipelineID + .newBuilder() + .setId(pipelineId2) + .setUuid128(uuid1282) + .build()) + .setIsLeader(false) + .build(); + + pipelineReportsProto = + PipelineReportsProto.newBuilder() + .addPipelineReport(pipelineReport) + .addPipelineReport(pipelineReport2) + .build(); + + DatanodeDetailsProto datanodeDetailsProto = + DatanodeDetailsProto.newBuilder() + .setHostName(HOST1) + .setUuid(datanodeId) + .setIpAddress(IP1) + .build(); + + extendedDatanodeDetailsProto = + HddsProtos.ExtendedDatanodeDetailsProto.newBuilder() + .setDatanodeDetails(datanodeDetailsProto) + .setVersion("0.6.0") + .setSetupTime(1596347628802L) + .setBuildDate("2020-08-01T08:50Z") + .setRevision("3346f493fa1690358add7bb9f3e5b52545993f36") + .build(); + + StorageReportProto storageReportProto1 = + StorageReportProto.newBuilder() + .setStorageType(StorageTypeProto.DISK) + .setStorageLocation("/disk1") + .setScmUsed(10 * OzoneConsts.GB) + .setRemaining(90 * OzoneConsts.GB) + .setCapacity(100 * OzoneConsts.GB) + .setStorageUuid(UUID.randomUUID().toString()) + .setFailed(false).build(); + + StorageReportProto storageReportProto2 = + StorageReportProto.newBuilder() + .setStorageType(StorageTypeProto.DISK) + .setStorageLocation("/disk2") + .setScmUsed(10 * OzoneConsts.GB) + .setRemaining(90 * OzoneConsts.GB) + .setCapacity(100 * OzoneConsts.GB) + .setStorageUuid(UUID.randomUUID().toString()) + .setFailed(false).build(); + + nodeReportProto = + NodeReportProto.newBuilder() + .addStorageReport(storageReportProto1) + .addStorageReport(storageReportProto2).build(); + + try { + reconScm.getDatanodeProtocolServer() + .register(extendedDatanodeDetailsProto, nodeReportProto, - containerReportsProto, pipelineReportsProto); ++ containerReportsProto, pipelineReportsProto, ++ defaultLayoutVersionProto()); + // Process all events in the event queue + reconScm.getEventQueue().processAll(1000); + } catch (Exception ex) { + Assert.fail(ex.getMessage()); + } + } + + @Test + public void testOpenContainerCount() throws Exception { + // In case of pipeline doesn't exist + waitAndCheckConditionAfterHeartbeat(() -> { + + DatanodeMetadata datanodeMetadata1 = getDatanodeMetadata(); + return datanodeMetadata1.getContainers() == 10 + && datanodeMetadata1.getPipelines().size() == 2; + }); + + DatanodeMetadata datanodeMetadata = getDatanodeMetadata(); + + int expectedCnt = datanodeMetadata.getOpenContainers(); + + // check if open container's count decrement according + for (long id = 1L; id <= 10L; ++id) { + --expectedCnt; + closeContainer(id); + DatanodeMetadata metadata = getDatanodeMetadata(); + Assert.assertEquals(expectedCnt, metadata.getOpenContainers()); + } + } + + private DatanodeMetadata getDatanodeMetadata() { + Response response = nodeEndpoint.getDatanodes(); + DatanodesResponse datanodesResponse = + (DatanodesResponse) response.getEntity(); + + DatanodeMetadata datanodeMetadata = + datanodesResponse.getDatanodes().stream().filter(metadata -> + metadata.getHostname().equals("host1.datanode")) + .findFirst().orElse(null); + return datanodeMetadata; + } + + private void closeContainer(long containerID) throws IOException { + + if (containerID >= 1L && containerID <= 5L) { + ContainerInfo closedContainer = new ContainerInfo.Builder() + .setContainerID(containerID) + .setReplicationConfig( + new RatisReplicationConfig(ReplicationFactor.ONE)) + .setState(LifeCycleState.CLOSED) + .setOwner("test") + .setPipelineID(pipeline.getId()) + .build(); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(closedContainer, pipeline); + when(mockScmServiceProvider.getContainerWithPipeline(containerID)) + .thenReturn(containerWithPipeline); + cpw.set((int) containerID - 1, containerWithPipeline); + } else if (containerID >= 6L && containerID <= 10L) { + ContainerInfo closedContainer = new ContainerInfo.Builder() + .setContainerID(containerID) + .setReplicationConfig( + new RatisReplicationConfig(ReplicationFactor.ONE)) + .setState(LifeCycleState.CLOSED) + .setOwner("test") + .setPipelineID(pipeline2.getId()) + .build(); + ContainerWithPipeline containerWithPipeline = + new ContainerWithPipeline(closedContainer, pipeline2); + when(mockScmServiceProvider.getContainerWithPipeline(containerID)) + .thenReturn(containerWithPipeline); + cpw.set((int) containerID - 1, containerWithPipeline); + } + when(mockScmServiceProvider + .getExistContainerWithPipelinesInBatch(containerIDs)) + .thenReturn(cpw); + updateContainerReport(containerID); + } + + private void updateContainerReport(long containerId) { + containerReportsProto = builder.setReports((int) containerId - 1, + ContainerReplicaProto.newBuilder() + .setContainerID(containerId) + .setState(ContainerReplicaProto.State.CLOSED) + .setOriginNodeId(datanodeId) + .build()) + .build(); + try { + reconScm.getDatanodeProtocolServer() + .register(extendedDatanodeDetailsProto, nodeReportProto, - containerReportsProto, pipelineReportsProto); ++ containerReportsProto, pipelineReportsProto, ++ defaultLayoutVersionProto()); + // Process all events in the event queue + reconScm.getEventQueue().processAll(1000); + } catch (Exception ex) { + Assert.fail(ex.getMessage()); + } + } + + private void waitAndCheckConditionAfterHeartbeat(Callable<Boolean> check) + throws Exception { + // if container report is processed first, and pipeline does not exist + // then container is not added until the next container report is processed + SCMHeartbeatRequestProto heartbeatRequestProto = + SCMHeartbeatRequestProto.newBuilder() + .setContainerReport(containerReportsProto) + .setDatanodeDetails(extendedDatanodeDetailsProto + .getDatanodeDetails()) + .build(); + + reconScm.getDatanodeProtocolServer().sendHeartbeat(heartbeatRequestProto); + LambdaTestUtils.await(30000, 1000, check); + } + } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
