bshashikant commented on a change in pull request #1953: URL: https://github.com/apache/ozone/pull/1953#discussion_r583525465
########## File path: hadoop-ozone/integration-test/src/test/java/org/apache/hadoop/ozone/MiniOzoneHAClusterImpl.java ########## @@ -0,0 +1,738 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ozone; + +import com.google.common.base.Preconditions; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import org.apache.hadoop.hdds.conf.OzoneConfiguration; +import org.apache.hadoop.hdds.scm.ScmConfigKeys; +import org.apache.hadoop.hdds.scm.TestUtils; +import org.apache.hadoop.hdds.scm.safemode.HealthyPipelineSafeModeRule; +import org.apache.hadoop.hdds.scm.server.StorageContainerManager; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.net.NetUtils; +import org.apache.hadoop.ozone.client.ObjectStore; +import org.apache.hadoop.ozone.client.OzoneClient; +import org.apache.hadoop.ozone.client.OzoneClientFactory; +import org.apache.hadoop.ozone.ha.ConfUtils; +import org.apache.hadoop.ozone.om.OMConfigKeys; +import org.apache.hadoop.ozone.om.OzoneManager; +import org.apache.hadoop.ozone.recon.ReconServer; +import org.apache.hadoop.security.authentication.client.AuthenticationException; +import org.apache.hadoop.test.GenericTestUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.net.BindException; +import java.util.ArrayList; +import java.util.List; +import java.util.Map; +import java.util.Random; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.TimeoutException; +import java.util.function.Function; + +import static org.apache.hadoop.hdds.HddsConfigKeys.OZONE_METADATA_DIRS; + +/** + * MiniOzoneHAClusterImpl creates a complete in-process Ozone cluster + * with OM HA and SCM HA suitable for running tests. + * The cluster consists of a set of + * OzoneManagers, StorageContainerManagers and multiple DataNodes. + */ +public class MiniOzoneHAClusterImpl extends MiniOzoneClusterImpl { + + private static final Logger LOG = + LoggerFactory.getLogger(MiniOzoneHAClusterImpl.class); + + private final OMHAService omhaService; + private final SCMHAService scmhaService; + private static ObjectStore store = null; + + private int waitForClusterToBeReadyTimeout = 120000; // 2 min + + private static final Random RANDOM = new Random(); + private static final int RATIS_RPC_TIMEOUT = 1000; // 1 second + public static final int NODE_FAILURE_TIMEOUT = 2000; // 2 seconds + + /** + * Creates a new MiniOzoneCluster. + * + * @throws IOException if there is an I/O error + */ + @SuppressWarnings("checkstyle:ParameterNumber") + public MiniOzoneHAClusterImpl( + OzoneConfiguration conf, + List<OzoneManager> activeOMList, + List<OzoneManager> inactiveOMList, + List<StorageContainerManager> activeSCMList, + List<StorageContainerManager> inactiveSCMList, + List<HddsDatanodeService> hddsDatanodes, + String omServiceId, + String scmServiceId, + ReconServer reconServer) { + super(conf, hddsDatanodes, reconServer); + omhaService = + new OMHAService(activeOMList, inactiveOMList, omServiceId); + scmhaService = + new SCMHAService(activeSCMList, inactiveSCMList, scmServiceId); + } + + /** + * Creates a new MiniOzoneCluster with all OMs active. + * This is used by MiniOzoneChaosCluster. + */ + protected MiniOzoneHAClusterImpl( + OzoneConfiguration conf, + List<OzoneManager> omList, + List<StorageContainerManager> scmList, + List<HddsDatanodeService> hddsDatanodes, + String omServiceId, + String scmServiceId) { + this(conf, omList, null, scmList, null, hddsDatanodes, + omServiceId, scmServiceId, null); + } + + @Override + public String getOMServiceId() { + return omhaService.getServiceId(); + } + + @Override + public String getSCMServiceId() { + return scmhaService.getServiceId(); + } + + /** + * Returns the first OzoneManager from the list. + * @return + */ + @Override + public OzoneManager getOzoneManager() { + return this.omhaService.getServices().get(0); + } + + @Override + public OzoneClient getRpcClient() throws IOException { + String omServiceId = omhaService.getServiceId(); + if (omServiceId == null) { + // Non-HA cluster. + return OzoneClientFactory.getRpcClient(getConf()); + } else { + // HA cluster + return OzoneClientFactory.getRpcClient(omServiceId, getConf()); + } + } + + public boolean isOMActive(String omNodeId) { + return omhaService.isServiceActive(omNodeId); + } + + public OzoneManager getOzoneManager(int index) { + return this.omhaService.getServiceByIndex(index); + } + + public OzoneManager getOzoneManager(String omNodeId) { + return this.omhaService.getServiceById(omNodeId); + } + + public List<OzoneManager> getOzoneManagersList() { + return omhaService.getServices(); + } + + /** + * Get OzoneManager leader object. + * @return OzoneManager object, null if there isn't one or more than one + */ + public OzoneManager getOMLeader() { + OzoneManager res = null; + for (OzoneManager ozoneManager : this.omhaService.getServices()) { + if (ozoneManager.isLeaderReady()) { + if (res != null) { + // Found more than one leader + // Return null, expect the caller to retry in a while + return null; + } + // Found a leader + res = ozoneManager; + } + } + return res; + } + + /** + * Start a previously inactive OM. + */ + public void startInactiveOM(String omNodeID) throws IOException { + omhaService.startInactiveService(omNodeID, OzoneManager::start); + } + + @Override + public void restartOzoneManager() throws IOException { + for (OzoneManager ozoneManager : this.omhaService.getServices()) { + ozoneManager.stop(); + ozoneManager.restart(); + } + } + + public void shutdownOzoneManager(OzoneManager ozoneManager) { + LOG.info("Shutting down OzoneManager " + ozoneManager.getOMNodeId()); + + ozoneManager.stop(); + } + + public void restartOzoneManager(OzoneManager ozoneManager, boolean waitForOM) + throws IOException, TimeoutException, InterruptedException { + LOG.info("Restarting OzoneManager " + ozoneManager.getOMNodeId()); + ozoneManager.restart(); + + if (waitForOM) { + GenericTestUtils.waitFor(ozoneManager::isRunning, + 1000, waitForClusterToBeReadyTimeout); + } + } + + public String getClusterId() throws IOException { + return scmhaService.getServices().get(0) + .getClientProtocolServer().getScmInfo().getClusterId(); + } + + public StorageContainerManager getActiveSCM() { + for (StorageContainerManager scm : scmhaService.getServices()) { + if (scm.checkLeader()) { + return scm; + } + } + return null; + } + + public void waitForSCMToBeReady() + throws TimeoutException, InterruptedException { + GenericTestUtils.waitFor(() -> { + for (StorageContainerManager scm : scmhaService.getServices()) { + if (scm.checkLeader()) { + return true; + } + } + return false; + }, 1000, waitForClusterToBeReadyTimeout); + } + + @Override + public void stop() { + for (OzoneManager ozoneManager : this.omhaService.getServices()) { + if (ozoneManager != null) { + LOG.info("Stopping the OzoneManager {}", ozoneManager.getOMNodeId()); + ozoneManager.stop(); + ozoneManager.join(); + } + } + + for (StorageContainerManager scm : this.scmhaService.getServices()) { + if (scm != null) { + LOG.info("Stopping the StorageContainerManager {}", scm.getScmId()); + scm.stop(); + scm.join(); + } + } + super.stop(); + } + + public void stopOzoneManager(int index) { + omhaService.getServices().get(index).stop(); + omhaService.getServices().get(index).join(); + } + + public void stopOzoneManager(String omNodeId) { + omhaService.getServiceById(omNodeId).stop(); + omhaService.getServiceById(omNodeId).join(); + } + + /** + * Builder for configuring the MiniOzoneCluster to run. + */ + public static class Builder extends MiniOzoneClusterImpl.Builder { + + private static final String OM_NODE_ID_PREFIX = "omNode-"; + private List<OzoneManager> activeOMs = new ArrayList<>(); + private List<OzoneManager> inactiveOMs = new ArrayList<>(); + + private static final String SCM_NODE_ID_PREFIX = "scmNode-"; + private List<StorageContainerManager> activeSCMs = new ArrayList<>(); + private List<StorageContainerManager> inactiveSCMs = new ArrayList<>(); + + /** + * Creates a new Builder. + * + * @param conf configuration + */ + public Builder(OzoneConfiguration conf) { + super(conf); + } + + public List<OzoneManager> getActiveOMs() { + return activeOMs; + } + + public List<OzoneManager> getInactiveOMs() { + return inactiveOMs; + } + + @Override + public MiniOzoneCluster build() throws IOException { + if (numOfActiveOMs > numOfOMs) { + throw new IllegalArgumentException("Number of active OMs cannot be " + + "more than the total number of OMs"); + } + + // If num of ActiveOMs is not set, set it to numOfOMs. + if (numOfActiveOMs == ACTIVE_OMS_NOT_SET) { + numOfActiveOMs = numOfOMs; + } + + // If num of ActiveOMs is not set, set it to numOfOMs. + if (numOfActiveSCMs == ACTIVE_SCMS_NOT_SET) { + numOfActiveSCMs = numOfSCMs; + } + + DefaultMetricsSystem.setMiniClusterMode(true); + initializeConfiguration(); + initOMRatisConf(); + StorageContainerManager scm; + ReconServer reconServer = null; + try { + createSCMService(); + createOMService(); + if (includeRecon) { + configureRecon(); + reconServer = new ReconServer(); + reconServer.execute(new String[] {}); + } + } catch (AuthenticationException ex) { + throw new IOException("Unable to build MiniOzoneCluster. ", ex); + } + + final List<HddsDatanodeService> hddsDatanodes = createHddsDatanodes( + activeSCMs, reconServer); + + MiniOzoneHAClusterImpl cluster = new MiniOzoneHAClusterImpl(conf, + activeOMs, inactiveOMs, activeSCMs, inactiveSCMs, + hddsDatanodes, omServiceId, scmServiceId, reconServer); + + if (startDataNodes) { + cluster.startHddsDatanodes(); + } + return cluster; + } + + protected void initOMRatisConf() { + conf.setBoolean(OMConfigKeys.OZONE_OM_RATIS_ENABLE_KEY, true); + conf.setInt(OMConfigKeys.OZONE_OM_HANDLER_COUNT_KEY, numOfOmHandlers); + + // If test change the following config values we will respect, + // otherwise we will set lower timeout values. + long defaultDuration = OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_DEFAULT + .getDuration(); + long curRatisRpcTimeout = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY, + defaultDuration, TimeUnit.MILLISECONDS); + conf.setTimeDuration(OMConfigKeys.OZONE_OM_RATIS_MINIMUM_TIMEOUT_KEY, + defaultDuration == curRatisRpcTimeout ? + RATIS_RPC_TIMEOUT : curRatisRpcTimeout, TimeUnit.MILLISECONDS); + + long defaultNodeFailureTimeout = + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT. + getDuration(); + long curNodeFailureTimeout = conf.getTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY, + defaultNodeFailureTimeout, + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_DEFAULT. + getUnit()); + conf.setTimeDuration( + OMConfigKeys.OZONE_OM_RATIS_SERVER_FAILURE_TIMEOUT_DURATION_KEY, + curNodeFailureTimeout == defaultNodeFailureTimeout ? + NODE_FAILURE_TIMEOUT : curNodeFailureTimeout, + TimeUnit.MILLISECONDS); + } + + /** + * Start OM service with multiple OMs. + */ + protected List<OzoneManager> createOMService() throws IOException, + AuthenticationException { + + List<OzoneManager> omList = Lists.newArrayList(); + + int retryCount = 0; + int basePort = 10000; + + while (true) { + try { + basePort = 10000 + RANDOM.nextInt(1000) * 4; + initOMHAConfig(basePort); + + for (int i = 1; i<= numOfOMs; i++) { + // Set nodeId + String nodeId = OM_NODE_ID_PREFIX + i; + OzoneConfiguration config = new OzoneConfiguration(conf); + config.set(OMConfigKeys.OZONE_OM_NODE_ID_KEY, nodeId); + // Set the OM http(s) address to null so that the cluster picks + // up the address set with service ID and node ID in initHAConfig + config.set(OMConfigKeys.OZONE_OM_HTTP_ADDRESS_KEY, ""); + config.set(OMConfigKeys.OZONE_OM_HTTPS_ADDRESS_KEY, ""); + + // Set metadata/DB dir base path + String metaDirPath = path + "/" + nodeId; + config.set(OZONE_METADATA_DIRS, metaDirPath); + // OMStorage omStore = new OMStorage(config); + // initializeOmStorage(omStore); + OzoneManager.omInit(config); + OzoneManager om = OzoneManager.createOm(config); + if (certClient != null) { + om.setCertClient(certClient); + } + omList.add(om); + + if (i <= numOfActiveOMs) { + om.start(); + activeOMs.add(om); + LOG.info("Started OzoneManager RPC server at {}", + om.getOmRpcServerAddr()); + } else { + inactiveOMs.add(om); + LOG.info("Intialized OzoneManager at {}. This OM is currently " + + "inactive (not running).", om.getOmRpcServerAddr()); + } + } + + // Set default OM address to point to the first OM. Clients would + // try connecting to this address by default + conf.set(OMConfigKeys.OZONE_OM_ADDRESS_KEY, + NetUtils.getHostPortString(omList.get(0).getOmRpcServerAddr())); + + break; + } catch (BindException e) { + for (OzoneManager om : omList) { + om.stop(); + om.join(); + LOG.info("Stopping OzoneManager server at {}", + om.getOmRpcServerAddr()); + } + omList.clear(); + ++retryCount; + LOG.info("MiniOzoneHACluster port conflicts, retried {} times", + retryCount); + } + } + return omList; + } + + /** + * Start OM service with multiple OMs. + */ + protected List<StorageContainerManager> createSCMService() + throws IOException, AuthenticationException { + List<StorageContainerManager> scmList = Lists.newArrayList(); + + int retryCount = 0; + int basePort = 12000; + + while (true) { + try { + basePort = 12000 + RANDOM.nextInt(1000) * 4; + initSCMHAConfig(basePort); + + for (int i = 1; i<= numOfSCMs; i++) { + // Set nodeId + String nodeId = SCM_NODE_ID_PREFIX + i; + String metaDirPath = path + "/" + nodeId; + OzoneConfiguration scmConfig = new OzoneConfiguration(conf); + scmConfig.set(OZONE_METADATA_DIRS, metaDirPath); + scmConfig.set(ScmConfigKeys.OZONE_SCM_NODE_ID_KEY, nodeId); + scmConfig.set(ScmConfigKeys.OZONE_SCM_HTTP_ADDRESS_KEY, ""); + scmConfig.set(ScmConfigKeys.OZONE_SCM_HTTPS_ADDRESS_KEY, ""); + scmConfig.setBoolean(ScmConfigKeys.OZONE_SCM_HA_ENABLE_KEY, true); + + // TODO: set SCM HA configs + + configureSCM(); + if (i == 1) { + StorageContainerManager.scmInit(scmConfig, clusterId); + } else { + StorageContainerManager.scmBootstrap(scmConfig); + } + StorageContainerManager scm = TestUtils.getScmSimple(scmConfig); + HealthyPipelineSafeModeRule rule = + scm.getScmSafeModeManager().getHealthyPipelineSafeModeRule(); + if (rule != null) { + // Set threshold to wait for safe mode exit - + // this is needed since a pipeline is marked open only after + // leader election. + rule.setHealthyPipelineThresholdCount(numOfDatanodes / 3); + } + scmList.add(scm); + + if (i <= numOfActiveSCMs) { + scm.start(); + activeSCMs.add(scm); + LOG.info("Started SCM RPC server at {}", + scm.getClientProtocolServer()); + } else { + inactiveSCMs.add(scm); + LOG.info("Intialized SCM at {}. This SCM is currently " + + "inactive (not running).", scm.getClientProtocolServer()); + } + } + + + break; + } catch (BindException e) { + for (StorageContainerManager scm : scmList) { + scm.stop(); + scm.join(); + LOG.info("Stopping StorageContainerManager server at {}", + scm.getClientProtocolServer()); + } + scmList.clear(); + ++retryCount; + LOG.info("MiniOzoneHACluster port conflicts, retried {} times", + retryCount); + } + } + return scmList; + } + + protected void configureSCM() { Review comment: It will be called from super class. ---------------------------------------------------------------- This is an automated message from the Apache Git Service. To respond to the message, please log on to GitHub and use the URL above to go to the specific comment. For queries about this service, please contact Infrastructure at: [email protected] --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
