This is an automated email from the ASF dual-hosted git repository. xyuanlu pushed a commit to branch helix-gateway-service in repository https://gitbox.apache.org/repos/asf/helix.git
commit 0c6187ebd7fd8bb2889b15f36b19f41dec300785 Author: Junkai Xue <junkai....@gmail.com> AuthorDate: Mon Jul 22 13:27:43 2024 -0700 Build HelixGateway integration test base (#2842) This test base contains all the setups including: 1. Mock participants which hybrid with gateway services. 2. Controller with start/stop. 3. Resource creation. 4. Only Online/Offline state model. 5. Gateway services with start/stop (now leave empty until gateway service is implemented). Also start a dummy test to extend test base to make sure all set ups are ready except gateway services. --- helix-gateway/src/test/conf/testng.xml | 2 +- .../java/org/apache/helix/gateway/DummyTest.java | 21 + .../helix/gateway/base/HelixGatewayTestBase.java | 141 ++++ .../helix/gateway/base/ZookeeperTestBase.java | 814 ++++++++++++++++++++ .../base/manager/ClusterControllerManager.java | 49 ++ .../helix/gateway/base/manager/ClusterManager.java | 135 ++++ .../base/manager/MockParticipantManager.java | 118 +++ .../base/statemodel/MockOFModelFactory.java | 54 ++ .../gateway/base/statemodel/MockOFStateModel.java | 65 ++ .../gateway/base/statemodel/MockTransition.java | 41 ++ .../apache/helix/gateway/base/util/TestHelper.java | 817 +++++++++++++++++++++ 11 files changed, 2256 insertions(+), 1 deletion(-) diff --git a/helix-gateway/src/test/conf/testng.xml b/helix-gateway/src/test/conf/testng.xml index 19446d49b..f77eab885 100644 --- a/helix-gateway/src/test/conf/testng.xml +++ b/helix-gateway/src/test/conf/testng.xml @@ -21,7 +21,7 @@ <suite name="Suite" parallel="false"> <test name="Test" preserve-order="true"> <packages> - <package name="org.apache.helix.helix.gateway.*"/> + <package name="org.apache.helix.gateway.*"/> </packages> </test> </suite> diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java new file mode 100644 index 000000000..7511cb3a3 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java @@ -0,0 +1,21 @@ +package org.apache.helix.gateway; + +import org.apache.helix.gateway.base.HelixGatewayTestBase; +import org.testng.Assert; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class DummyTest extends HelixGatewayTestBase { + + @BeforeClass + public void beforeClass() { + _numParticipants = 5; + super.beforeClass(); + } + + @Test + public void testSetups() { + createResource("TestDB_1", 4, 2); + Assert.assertTrue(_clusterVerifier.verify()); + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java new file mode 100644 index 000000000..06cf45cd1 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java @@ -0,0 +1,141 @@ +package org.apache.helix.gateway.base; + +import java.util.List; + +import org.apache.helix.ConfigAccessor; +import org.apache.helix.gateway.base.manager.ClusterControllerManager; +import org.apache.helix.gateway.base.manager.MockParticipantManager; +import org.apache.helix.gateway.base.util.TestHelper; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.collections.Lists; + +public class HelixGatewayTestBase extends ZookeeperTestBase { + protected static final int START_PORT = 12918; + protected String _clusterName = "TEST_CLUSTER"; // change the cluster name for each test class + protected int _numParticipants = 3; + protected int _numGatewayInstances = 3; + protected ClusterControllerManager _controller; + protected List<MockParticipantManager> _participants; + protected ConfigAccessor _configAccessor; + protected BestPossibleExternalViewVerifier _clusterVerifier; + + @BeforeClass + public void beforeClass() { + _participants = Lists.newArrayList(); + _configAccessor = new ConfigAccessor(ZK_ADDR); + _gSetupTool.getClusterManagementTool().addCluster(_clusterName, true); + controllerManagement(true); + startParticipants(); + startGatewayService(); + } + + @AfterClass + public void afterClass() { + controllerManagement(false); + stopParticipants(true); + stopGatewayService(); + _gSetupTool.getClusterManagementTool().dropCluster(_clusterName); + } + + + /** + * Start or stop the controller + * @param start true to start the controller, false to stop the controller + */ + private void controllerManagement(boolean start) { + String controllerName = CONTROLLER_PREFIX + "_0"; + + if (start) { + _controller = new ClusterControllerManager(ZK_ADDR, _clusterName, controllerName); + _controller.syncStart(); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).setWaitTillVerify( + TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build(); + } else { + _controller.syncStop(); + } + + enablePersistBestPossibleAssignment(_gZkClient, _clusterName, start); + } + + /** + * Create participants with the given number of participants defined by _numParticipants + */ + private void startParticipants() { + for (int i = 0; i < _numParticipants; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _gSetupTool.addInstanceToCluster(_clusterName, storageNodeName); + + // start dummy participants + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, _clusterName, storageNodeName); + participant.syncStart(); + _participants.add(participant); + } + } + + /** + * Stop participants and optionally drop the participants + * if dropParticipants is true + * + * @param dropParticipants true to drop the participants, false to stop the participants + */ + private void stopParticipants(boolean dropParticipants) { + for (MockParticipantManager participant : _participants) { + participant.syncStop(); + if (dropParticipants) { + _gSetupTool.getClusterManagementTool().dropInstance(_clusterName, + _configAccessor.getInstanceConfig(_clusterName, participant.getInstanceName())); + } + } + _participants.clear(); + } + + /** + * Create a resource with the given number of partitions and replicas + * WARNING: 1) assume only support OnlineOffline state model + * 2) assume only support FULL_AUTO rebalance mode + * + * Default rebalance strategy is CrushEdRebalanceStrategy + * + * @param resourceName name of the resource + * @param numPartitions number of partitions + * @param numReplicas number of replicas + */ + + protected void createResource(String resourceName, int numPartitions, int numReplicas) { + createResource(resourceName, numPartitions, numReplicas, "org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");} + + /** + * Create a resource with the given number of partitions, replicas and rebalance strategy + * + * @param resourceName name of the resource + * @param numPartitions number of partitions + * @param numReplicas number of replicas + * @param rebalanceStrategy rebalance strategy + */ + protected void createResource(String resourceName, int numPartitions, int numReplicas, String rebalanceStrategy) { + _gSetupTool.getClusterManagementTool().addResource(_clusterName, resourceName, numPartitions, "OnlineOffline", + "FULL_AUTO", rebalanceStrategy); + _gSetupTool.getClusterManagementTool().rebalance(_clusterName, resourceName, numReplicas); + } + + /** + * Start the gateway service with the given number of gateway instances + * defined by _numGatewayInstances + */ + protected void startGatewayService() { + // Start the gateway service + } + + /** + * Stop the gateway service + */ + protected void stopGatewayService() { + // Stop the gateway service + + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java new file mode 100644 index 000000000..8d87bc4f3 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java @@ -0,0 +1,814 @@ +package org.apache.helix.gateway.base; + +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.reflect.Method; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.logging.Level; +import javax.management.MBeanServerConnection; +import javax.management.ObjectName; + +import com.google.common.base.Preconditions; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyPathBuilder; +import org.apache.helix.SystemPropertyKeys; +import org.apache.helix.api.config.HelixConfigProperty; +import org.apache.helix.constants.InstanceConstants; +import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage; +import org.apache.helix.controller.pipeline.Pipeline; +import org.apache.helix.controller.pipeline.Stage; +import org.apache.helix.controller.pipeline.StageContext; +import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer; +import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy; +import org.apache.helix.controller.rebalancer.waged.WagedRebalancer; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.gateway.base.util.TestHelper; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.apache.helix.tools.StateModelConfigGenerator; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.client.ZkClient; +import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.ITestContext; +import org.testng.annotations.AfterClass; +import org.testng.annotations.AfterSuite; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.BeforeSuite; + +public abstract class ZookeeperTestBase { + private static final Logger LOG = LoggerFactory.getLogger(ZookeeperTestBase.class); + private static final String MULTI_ZK_PROPERTY_KEY = "multiZk"; + private static final String NUM_ZK_PROPERTY_KEY = "numZk"; + + protected static ZkServer _zkServer; + protected static HelixZkClient _gZkClient; + protected static ClusterSetup _gSetupTool; + protected static BaseDataAccessor<ZNRecord> _baseAccessor; + protected static MBeanServerConnection _server = ManagementFactory.getPlatformMBeanServer(); + + private final Map<String, Map<String, HelixZkClient>> _liveInstanceOwners = new HashMap<>(); + + private static final String ZK_PREFIX = "localhost:"; + private static final int ZK_START_PORT = 2283; + public static final String ZK_ADDR = ZK_PREFIX + ZK_START_PORT; + protected static final String CLUSTER_PREFIX = "CLUSTER"; + protected static final String CONTROLLER_CLUSTER_PREFIX = "CONTROLLER_CLUSTER"; + protected final String CONTROLLER_PREFIX = "controller"; + protected final String PARTICIPANT_PREFIX = "localhost"; + private static final long MANUAL_GC_PAUSE = 4000L; + + /* + * Multiple ZK references + */ + // The following maps hold ZK connect string as keys + protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>(); + protected static final Map<String, HelixZkClient> _helixZkClientMap = new HashMap<>(); + protected static final Map<String, ClusterSetup> _clusterSetupMap = new HashMap<>(); + protected static final Map<String, BaseDataAccessor> _baseDataAccessorMap = new HashMap<>(); + + static public void reportPhysicalMemory() { + com.sun.management.OperatingSystemMXBean os = (com.sun.management.OperatingSystemMXBean) + java.lang.management.ManagementFactory.getOperatingSystemMXBean(); + long physicalMemorySize = os.getTotalPhysicalMemorySize(); + System.out.println("************ SYSTEM Physical Memory:" + physicalMemorySize); + + long MB = 1024 * 1024; + Runtime runtime = Runtime.getRuntime(); + long free = runtime.freeMemory()/MB; + long total = runtime.totalMemory()/MB; + System.out.println("************ total memory:" + total + " free memory:" + free); + } + + @BeforeSuite + public void beforeSuite() throws Exception { + // TODO: use logging.properties file to config java.util.logging.Logger levels + java.util.logging.Logger topJavaLogger = java.util.logging.Logger.getLogger(""); + topJavaLogger.setLevel(Level.WARNING); + + // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk commends + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY, "3000"); + + // Start in-memory ZooKeepers + // If multi-ZooKeeper is enabled, start more ZKs. Otherwise, just set up one ZK + int numZkToStart = 1; + String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY); + if (multiZkConfig != null && multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) { + String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY); + if (numZkFromConfig != null) { + try { + numZkToStart = Math.max(Integer.parseInt(numZkFromConfig), numZkToStart); + } catch (Exception e) { + Assert.fail("Failed to parse the number of ZKs from config!"); + } + } else { + Assert.fail("multiZk config is set but numZk config is missing!"); + } + } + + // Start "numZkFromConfigInt" ZooKeepers + for (int i = 0; i < numZkToStart; i++) { + startZooKeeper(i); + } + + // Set the references for backward-compatibility with a single ZK environment + _zkServer = _zkServerMap.get(ZK_ADDR); + _gZkClient = _helixZkClientMap.get(ZK_ADDR); + _gSetupTool = _clusterSetupMap.get(ZK_ADDR); + _baseAccessor = _baseDataAccessorMap.get(ZK_ADDR); + + // Clean up all JMX objects + for (ObjectName mbean : _server.queryNames(null, null)) { + try { + _server.unregisterMBean(mbean); + } catch (Exception e) { + // OK + } + } + } + + /** + * Starts an additional in-memory ZooKeeper for testing. + * @param i index to be added to the ZK port to avoid conflicts + * @throws Exception + */ + private static synchronized void startZooKeeper(int i) { + String zkAddress = ZK_PREFIX + (ZK_START_PORT + i); + _zkServerMap.computeIfAbsent(zkAddress, ZookeeperTestBase::createZookeeperServer); + _helixZkClientMap.computeIfAbsent(zkAddress, ZookeeperTestBase::createZkClient); + _clusterSetupMap.computeIfAbsent(zkAddress, key -> new ClusterSetup(_helixZkClientMap.get(key))); + _baseDataAccessorMap.computeIfAbsent(zkAddress, key -> new ZkBaseDataAccessor(_helixZkClientMap.get(key))); + } + + private static ZkServer createZookeeperServer(String zkAddress) { + try { + return Preconditions.checkNotNull(TestHelper.startZkServer(zkAddress)); + } catch (Exception e) { + throw new IllegalArgumentException("Failed to start zookeeper server at " + zkAddress, e); + } + } + + private static HelixZkClient createZkClient(String zkAddress) { + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + return DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress), clientConfig); + } + + @AfterSuite + public void afterSuite() throws IOException { + // Clean up all JMX objects + for (ObjectName mbean : _server.queryNames(null, null)) { + try { + _server.unregisterMBean(mbean); + } catch (Exception e) { + // OK + } + } + + synchronized (ZookeeperTestBase.class) { + // Close all ZK resources + _baseDataAccessorMap.values().forEach(BaseDataAccessor::close); + _clusterSetupMap.values().forEach(ClusterSetup::close); + _helixZkClientMap.values().forEach(HelixZkClient::close); + _zkServerMap.values().forEach(TestHelper::stopZkServer); + } + } + + @BeforeClass + public void beforeClass() throws Exception { + cleanupJMXObjects(); + // Giving each test some time to settle (such as gc pause, etc). + // Note that this is the best effort we could make to stabilize tests, not a complete solution + Runtime.getRuntime().gc(); + Thread.sleep(MANUAL_GC_PAUSE); + } + + @BeforeMethod + public void beforeTest(Method testMethod, ITestContext testContext) { + testContext.setAttribute("StartTime", System.currentTimeMillis()); + } + + protected void cleanupJMXObjects() throws IOException { + // Clean up all JMX objects + for (ObjectName mbean : _server.queryNames(null, null)) { + try { + _server.unregisterMBean(mbean); + } catch (Exception e) { + // OK + } + } + } + + protected String getShortClassName() { + return this.getClass().getSimpleName(); + } + + protected String getCurrentLeader(HelixZkClient zkClient, String clusterName) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader()); + if (leader == null) { + return null; + } + return leader.getInstanceName(); + } + + protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setPersistBestPossibleAssignment(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enablePersistIntermediateAssignment(HelixZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setPersistIntermediateAssignment(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String clusterName, + Boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setTopologyAwareEnabled(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName, + boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setDelayRebalaceEnabled(enabled); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String clusterName, + String instanceName, boolean enabled) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + InstanceConfig instanceConfig = configAccessor.getInstanceConfig(clusterName, instanceName); + instanceConfig.setDelayRebalanceEnabled(enabled); + configAccessor.setInstanceConfig(clusterName, instanceName, instanceConfig); + } + + protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String clusterName, + boolean enabled, long delay) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setDelayRebalaceEnabled(enabled); + clusterConfig.setRebalanceDelayTime(delay); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void enableP2PInCluster(String clusterName, ConfigAccessor configAccessor, + boolean enable) { + // enable p2p message in cluster. + if (enable) { + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.enableP2PMessage(true); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } else { + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.getRecord().getSimpleFields() + .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + } + + protected void enableP2PInResource(String clusterName, ConfigAccessor configAccessor, + String dbName, boolean enable) { + if (enable) { + ResourceConfig resourceConfig = + new ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build(); + configAccessor.setResourceConfig(clusterName, dbName, resourceConfig); + } else { + // remove P2P Message in resource config + ResourceConfig resourceConfig = configAccessor.getResourceConfig(clusterName, dbName); + if (resourceConfig != null) { + resourceConfig.getRecord().getSimpleFields() + .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name()); + configAccessor.setResourceConfig(clusterName, dbName, resourceConfig); + } + } + } + + protected void setDelayTimeInCluster(HelixZkClient zkClient, String clusterName, long delay) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setRebalanceDelayTime(delay); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient, + String clusterName, long lastOnDemandTime) { + ConfigAccessor configAccessor = new ConfigAccessor(zkClient); + ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName); + clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime); + configAccessor.setClusterConfig(clusterName, clusterConfig); + } + + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay) { + return createResourceWithDelayedRebalance(clusterName, db, stateModel, numPartition, replica, + minActiveReplica, delay, AutoRebalanceStrategy.class.getName()); + } + + protected IdealState createResourceWithDelayedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica, long delay, + String rebalanceStrategy) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy); + } + + protected IdealState createResourceWithWagedRebalance(String clusterName, String db, + String stateModel, int numPartition, int replica, int minActiveReplica) { + return createResource(clusterName, db, stateModel, numPartition, replica, minActiveReplica, + -1, WagedRebalancer.class.getName(), null); + } + + private IdealState createResource(String clusterName, String db, String stateModel, + int numPartition, int replica, int minActiveReplica, long delay, String rebalancerClassName, + String rebalanceStrategy) { + IdealState idealState = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + if (idealState == null) { + _gSetupTool.addResourceToCluster(clusterName, db, numPartition, stateModel, + IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy); + } + + idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + idealState.setMinActiveReplicas(minActiveReplica); + if (!idealState.isDelayRebalanceEnabled()) { + idealState.setDelayRebalanceEnabled(true); + } + if (delay > 0) { + idealState.setRebalanceDelay(delay); + } + idealState.setRebalancerClassName(rebalancerClassName); + _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName, db, idealState); + _gSetupTool.rebalanceStorageCluster(clusterName, db, replica); + idealState = _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db); + + return idealState; + } + + protected IdealState createIdealState(String resourceGroupName, String instanceGroupTag, + List<String> instanceNames, int numPartition, int replica, String rebalanceMode, + String stateModelDef) { + IdealState is = _gSetupTool.createIdealStateForResourceGroup(resourceGroupName, + instanceGroupTag, numPartition, replica, rebalanceMode, stateModelDef); + + // setup initial partition->instance mapping. + int nodeIdx = 0; + int numNode = instanceNames.size(); + assert (numNode >= replica); + for (int i = 0; i < numPartition; i++) { + String partitionName = resourceGroupName + "_" + i; + for (int j = 0; j < replica; j++) { + is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode), + OnlineOfflineSMD.States.ONLINE.toString()); + } + nodeIdx++; + } + + return is; + } + + protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName, String dbName, + List<String> preferenceList, String stateModelDef, int numPartition, int replica) { + clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, stateModelDef, + IdealState.RebalanceMode.SEMI_AUTO.toString()); + clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica); + + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, dbName); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is); + } + + /** + * Validate there should be always minimal active replica and top state replica for each + * partition. + * Also make sure there is always some partitions with only active replica count. + */ + protected void validateMinActiveAndTopStateReplica(IdealState is, ExternalView ev, + int minActiveReplica, int numNodes) { + StateModelDefinition stateModelDef = + BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition(); + String topState = stateModelDef.getStatesPriorityList().get(0); + int replica = Integer.valueOf(is.getReplicas()); + + Map<String, Integer> stateCount = stateModelDef.getStateCountMap(numNodes, replica); + Set<String> activeStates = stateCount.keySet(); + + for (String partition : is.getPartitionSet()) { + Map<String, String> assignmentMap = ev.getRecord().getMapField(partition); + Assert.assertNotNull(assignmentMap, + is.getResourceName() + "'s best possible assignment is null for partition " + partition); + Assert.assertTrue(!assignmentMap.isEmpty(), + is.getResourceName() + "'s partition " + partition + " has no best possible map in IS."); + + boolean hasTopState = false; + int activeReplica = 0; + for (String state : assignmentMap.values()) { + if (topState.equalsIgnoreCase(state)) { + hasTopState = true; + } + if (activeStates.contains(state)) { + activeReplica++; + } + } + + if (activeReplica < minActiveReplica) { + int a = 0; + } + + Assert.assertTrue(hasTopState, String.format("%s missing %s replica", partition, topState)); + Assert.assertTrue(activeReplica >= minActiveReplica, + String.format("%s has less active replica %d then required %d", partition, activeReplica, + minActiveReplica)); + } + } + + protected void runStage(HelixManager manager, ClusterEvent event, Stage stage) throws Exception { + event.addAttribute(AttributeName.helixmanager.name(), manager); + StageContext context = new StageContext(); + stage.init(context); + stage.preProcess(); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } + stage.postProcess(); + } + + public void verifyInstance(HelixZkClient zkClient, String clusterName, String instance, + boolean wantExists) { + // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName); + String instanceConfigsPath = PropertyPathBuilder.instanceConfig(clusterName); + String instanceConfigPath = instanceConfigsPath + "/" + instance; + String instancePath = PropertyPathBuilder.instance(clusterName, instance); + Assert.assertEquals(wantExists, zkClient.exists(instanceConfigPath)); + Assert.assertEquals(wantExists, zkClient.exists(instancePath)); + } + + public void verifyResource(HelixZkClient zkClient, String clusterName, String resource, + boolean wantExists) { + String resourcePath = PropertyPathBuilder.idealState(clusterName, resource); + Assert.assertEquals(wantExists, zkClient.exists(resourcePath)); + } + + public void verifyEnabled(HelixZkClient zkClient, String clusterName, String instance, + boolean wantEnabled) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + InstanceConfig config = accessor.getProperty(keyBuilder.instanceConfig(instance)); + Assert.assertEquals(wantEnabled, config.getInstanceEnabled()); + } + + public void verifyReplication(HelixZkClient zkClient, String clusterName, String resource, + int repl) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + IdealState idealState = accessor.getProperty(keyBuilder.idealStates(resource)); + for (String partitionName : idealState.getPartitionSet()) { + if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO) { + Assert.assertEquals(repl, idealState.getPreferenceList(partitionName).size()); + } else if (idealState.getRebalanceMode() == IdealState.RebalanceMode.CUSTOMIZED) { + Assert.assertEquals(repl, idealState.getInstanceStateMap(partitionName).size()); + } + } + } + + protected void setupStateModel(String clusterName) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + StateModelDefinition masterSlave = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave()); + accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()), masterSlave); + + StateModelDefinition leaderStandby = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby()); + accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()), leaderStandby); + + StateModelDefinition onlineOffline = + new StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline()); + accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()), onlineOffline); + + } + + protected Message createMessage(Message.MessageType type, String msgId, String fromState, + String toState, String resourceName, String tgtName) { + Message msg = new Message(type.toString(), msgId); + msg.setFromState(fromState); + msg.setToState(toState); + msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(), resourceName); + msg.setTgtName(tgtName); + return msg; + } + + protected List<IdealState> setupIdealState(String clusterName, int[] nodes, String[] resources, + int partitions, int replicas) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + List<IdealState> idealStates = new ArrayList<>(); + List<String> instances = new ArrayList<>(); + for (int i : nodes) { + instances.add("localhost_" + i); + } + + for (String resourceName : resources) { + IdealState idealState = new IdealState(resourceName); + for (int p = 0; p < partitions; p++) { + List<String> value = new ArrayList<>(); + for (int r = 0; r < replicas; r++) { + int n = nodes[(p + r) % nodes.length]; + value.add("localhost_" + n); + } + idealState.getRecord().setListField(resourceName + "_" + p, value); + } + + idealState.setReplicas(Integer.toString(replicas)); + idealState.setStateModelDefRef("MasterSlave"); + idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO); + idealState.setNumPartitions(partitions); + idealStates.add(idealState); + + // System.out.println(idealState); + accessor.setProperty(keyBuilder.idealStates(resourceName), idealState); + } + return idealStates; + } + + @AfterClass + public void cleanupLiveInstanceOwners() throws InterruptedException { + String testClassName = this.getShortClassName(); + System.out.println("AfterClass: " + testClassName + " called."); + for (String cluster : _liveInstanceOwners.keySet()) { + Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster); + for (HelixZkClient client : clientMap.values()) { + client.close(); + } + clientMap.clear(); + } + _liveInstanceOwners.clear(); + } + + protected List<LiveInstance> setupLiveInstances(String clusterName, int[] liveInstances) { + HelixZkClient.ZkClientConfig clientConfig = new HelixZkClient.ZkClientConfig(); + clientConfig.setZkSerializer(new ZNRecordSerializer()); + + List<LiveInstance> result = new ArrayList<>(); + + for (int i = 0; i < liveInstances.length; i++) { + String instance = "localhost_" + liveInstances[i]; + _liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>()); + Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(clusterName); + clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR), clientConfig)); + HelixZkClient client = clientMap.get(instance); + + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(client)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + LiveInstance liveInstance = new LiveInstance(instance); + // Keep setting the session id in the deprecated field for ensure the same behavior as a real participant. + // Note the participant is doing so for backward compatibility. + liveInstance.setSessionId(Long.toHexString(client.getSessionId())); + // Please refer to the version requirement here: helix-core/src/main/resources/cluster-manager-version.properties + // Ensuring version compatibility can avoid the warning message during test. + liveInstance.setHelixVersion("0.4"); + accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance); + result.add(accessor.getProperty(keyBuilder.liveInstance(instance))); + } + return result; + } + + protected void deleteLiveInstances(String clusterName) { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + Map<String, HelixZkClient> clientMap = _liveInstanceOwners.getOrDefault(clusterName, Collections.emptyMap()); + + for (String liveInstance : accessor.getChildNames(keyBuilder.liveInstances())) { + ZKHelixDataAccessor dataAccessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_gZkClient)); + dataAccessor.removeProperty(keyBuilder.liveInstance(liveInstance)); + + HelixZkClient client = clientMap.remove(liveInstance); + if (client != null) { + client.close(); + } + } + + if (clientMap.isEmpty()) { + _liveInstanceOwners.remove(clusterName); + } + } + + protected void setupInstances(String clusterName, int[] instances) { + HelixAdmin admin = new ZKHelixAdmin(_gZkClient); + for (int i = 0; i < instances.length; i++) { + String instance = "localhost_" + instances[i]; + InstanceConfig instanceConfig = new InstanceConfig(instance); + instanceConfig.setHostName("localhost"); + instanceConfig.setPort("" + instances[i]); + instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE); + admin.addInstance(clusterName, instanceConfig); + } + } + + protected void runPipeline(ClusterEvent event, Pipeline pipeline, boolean shouldThrowException) + throws Exception { + try { + pipeline.handle(event); + pipeline.finish(); + } catch (Exception e) { + if (shouldThrowException) { + throw e; + } else { + LOG.error("Exception while executing pipeline: {}. Will not continue to next pipeline", + pipeline, e); + } + } + } + + protected void runStage(ClusterEvent event, Stage stage) throws Exception { + StageContext context = new StageContext(); + stage.init(context); + stage.preProcess(); + + // AbstractAsyncBaseStage will run asynchronously, and it's main logics are implemented in + // execute() function call + // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving runStage() + // to a shared library + if (stage instanceof AbstractAsyncBaseStage) { + ((AbstractAsyncBaseStage) stage).execute(event); + } else { + stage.process(event); + } + stage.postProcess(); + } + + protected void deleteCluster(String clusterName) { + TestHelper.dropCluster(clusterName, _gZkClient, _gSetupTool); + } + + /** + * Poll for the existence (or lack thereof) of a specific Helix property + * @param clazz the HelixProeprty subclass + * @param accessor connected HelixDataAccessor + * @param key the property key to look up + * @param shouldExist true if the property should exist, false otherwise + * @return the property if found, or null if it does not exist + */ + protected <T extends HelixProperty> T pollForProperty(Class<T> clazz, HelixDataAccessor accessor, + PropertyKey key, boolean shouldExist) throws InterruptedException { + final int POLL_TIMEOUT = 5000; + final int POLL_INTERVAL = 50; + T property = accessor.getProperty(key); + int timeWaited = 0; + while (((shouldExist && property == null) || (!shouldExist && property != null)) + && timeWaited < POLL_TIMEOUT) { + Thread.sleep(POLL_INTERVAL); + timeWaited += POLL_INTERVAL; + property = accessor.getProperty(key); + } + return property; + } + + /** + * Ensures that external view and current state are empty + */ + protected static class EmptyZkVerifier implements ClusterStateVerifier.ZkVerifier { + private final String _clusterName; + private final String _resourceName; + private final HelixZkClient _zkClient; + + /** + * Instantiate the verifier + * @param clusterName the cluster to verify + * @param resourceName the resource to verify + */ + public EmptyZkVerifier(String clusterName, String resourceName) { + _clusterName = clusterName; + _resourceName = resourceName; + + _zkClient = DedicatedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR)); + _zkClient.setZkSerializer(new ZNRecordSerializer()); + } + + @Override + public boolean verify() { + BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_zkClient); + HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName, baseAccessor); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + ExternalView externalView = accessor.getProperty(keyBuilder.externalView(_resourceName)); + + // verify external view empty + if (externalView != null) { + for (String partition : externalView.getPartitionSet()) { + Map<String, String> stateMap = externalView.getStateMap(partition); + if (stateMap != null && !stateMap.isEmpty()) { + LOG.error("External view not empty for " + partition); + return false; + } + } + } + + // verify current state empty + List<String> liveParticipants = accessor.getChildNames(keyBuilder.liveInstances()); + for (String participant : liveParticipants) { + List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(participant)); + for (String sessionId : sessionIds) { + CurrentState currentState = + accessor.getProperty(keyBuilder.currentState(participant, sessionId, _resourceName)); + Map<String, String> partitionStateMap = currentState.getPartitionStateMap(); + if (partitionStateMap != null && !partitionStateMap.isEmpty()) { + LOG.error("Current state not empty for " + participant); + return false; + } + } + + List<String> taskSessionIds = + accessor.getChildNames(keyBuilder.taskCurrentStateSessions(participant)); + for (String sessionId : taskSessionIds) { + CurrentState taskCurrentState = accessor + .getProperty(keyBuilder.taskCurrentState(participant, sessionId, _resourceName)); + Map<String, String> taskPartitionStateMap = taskCurrentState.getPartitionStateMap(); + if (taskPartitionStateMap != null && !taskPartitionStateMap.isEmpty()) { + LOG.error("Task current state not empty for " + participant); + return false; + } + } + } + return true; + } + + @Override + public ZkClient getZkClient() { + return (ZkClient) _zkClient; + } + + @Override + public String getClusterName() { + return _clusterName; + } + } + +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java new file mode 100644 index 000000000..4fecde88f --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java @@ -0,0 +1,49 @@ +package org.apache.helix.gateway.base.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.HelixManagerProperty; +import org.apache.helix.InstanceType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * The standalone cluster controller class + */ +public class ClusterControllerManager extends ClusterManager { + private static Logger LOG = LoggerFactory.getLogger(ClusterControllerManager.class); + + public ClusterControllerManager(String zkAddr, String clusterName) { + this(zkAddr, clusterName, "controller"); + } + + public ClusterControllerManager(String zkAddr, String clusterName, String controllerName) { + super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER); + } + + public ClusterControllerManager(String clusterName, HelixManagerProperty helixManagerProperty) { + super(clusterName, "controller", InstanceType.CONTROLLER, null, null, helixManagerProperty); + } + + @Override + public void finalize() { + super.finalize(); + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java new file mode 100644 index 000000000..3858513e5 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java @@ -0,0 +1,135 @@ +package org.apache.helix.gateway.base.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.AtomicLong; + +import org.apache.helix.HelixManagerProperty; +import org.apache.helix.InstanceType; +import org.apache.helix.manager.zk.CallbackHandler; +import org.apache.helix.manager.zk.HelixManagerStateListener; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class ClusterManager extends ZKHelixManager implements Runnable { + private static Logger LOG = LoggerFactory.getLogger(ClusterManager.class); + private static final int DISCONNECT_WAIT_TIME_MS = 3000; + + private static AtomicLong UID = new AtomicLong(10000); + private long _uid; + + private final String _clusterName; + private final String _instanceName; + private final InstanceType _type; + + protected CountDownLatch _startCountDown = new CountDownLatch(1); + protected CountDownLatch _stopCountDown = new CountDownLatch(1); + protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1); + + protected boolean _started = false; + + protected Thread _watcher; + + protected ClusterManager(String zkAddr, String clusterName, String instanceName, + InstanceType type) { + super(clusterName, instanceName, type, zkAddr); + _clusterName = clusterName; + _instanceName = instanceName; + _type = type; + _uid = UID.getAndIncrement(); + } + protected ClusterManager(String clusterName, String instanceName, InstanceType instanceType, + String zkAddress, HelixManagerStateListener stateListener, + HelixManagerProperty helixManagerProperty) { + super(clusterName, instanceName, instanceType, zkAddress, stateListener, helixManagerProperty); + _clusterName = clusterName; + _instanceName = instanceName; + _type = instanceType; + _uid = UID.getAndIncrement(); + } + + public void syncStop() { + _stopCountDown.countDown(); + try { + _waitStopFinishCountDown.await(); + _started = false; + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for finish", e); + } + } + + // This should not be called more than once because HelixManager.connect() should not be called more than once. + public void syncStart() { + if (_started) { + throw new RuntimeException( + "Helix Controller already started. Do not call syncStart() more than once."); + } else { + _started = true; + } + + _watcher = new Thread(this); + _watcher.setName(String + .format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName, _instanceName, _type.name(), _uid)); + LOG.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}", _clusterName, _instanceName, _type.name(), _uid, Thread.currentThread().getStackTrace()); + _watcher.start(); + + try { + _startCountDown.await(); + } catch (InterruptedException e) { + LOG.error("Interrupted waiting for start", e); + } + } + + @Override + public void run() { + try { + connect(); + _startCountDown.countDown(); + _stopCountDown.await(); + } catch (Exception e) { + LOG.error("exception running controller-manager", e); + } finally { + _startCountDown.countDown(); + disconnect(); + _waitStopFinishCountDown.countDown(); + } + } + + @Override + public void finalize() { + _watcher.interrupt(); + try { + _watcher.join(DISCONNECT_WAIT_TIME_MS); + } catch (InterruptedException e) { + LOG.error("ClusterManager watcher cleanup in the finalize method was interrupted.", e); + } finally { + if (isConnected()) { + LOG.warn( + "The HelixManager ({}-{}-{}) is still connected after {} ms wait. This is a potential resource leakage!", + _clusterName, _instanceName, _type.name(), DISCONNECT_WAIT_TIME_MS); + } + } + } +} + diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java new file mode 100644 index 000000000..34047c525 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java @@ -0,0 +1,118 @@ +package org.apache.helix.gateway.base.manager; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.concurrent.CountDownLatch; + +import org.apache.helix.HelixCloudProperty; +import org.apache.helix.HelixManagerProperty; +import org.apache.helix.HelixPropertyFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.gateway.base.statemodel.MockOFModelFactory; +import org.apache.helix.gateway.base.statemodel.MockTransition; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.participant.StateMachineEngine; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockParticipantManager extends ClusterManager { + private static final Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class); + + protected int _transDelay = 10; + + protected MockOFModelFactory _ofModelFactory; + protected HelixCloudProperty _helixCloudProperty; + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName) { + this(zkAddr, clusterName, instanceName, 10); + } + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName, + int transDelay) { + this(zkAddr, clusterName, instanceName, transDelay, null); + } + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName, + int transDelay, HelixCloudProperty helixCloudProperty) { + this(zkAddr, clusterName, instanceName, transDelay, helixCloudProperty, + HelixPropertyFactory.getInstance().getHelixManagerProperty(zkAddr, clusterName)); + } + + public MockParticipantManager(String zkAddr, String clusterName, String instanceName, + int transDelay, HelixCloudProperty helixCloudProperty, + HelixManagerProperty helixManagerProperty) { + super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr, null, helixManagerProperty); + _transDelay = transDelay; + _ofModelFactory = new MockOFModelFactory(null); + _helixCloudProperty = helixCloudProperty; + } + + public MockParticipantManager(String clusterName, String instanceName, + HelixManagerProperty helixManagerProperty, int transDelay, + HelixCloudProperty helixCloudProperty) { + super(clusterName, instanceName, InstanceType.PARTICIPANT, null, null, helixManagerProperty); + _transDelay = transDelay; + _ofModelFactory = new MockOFModelFactory(); + _helixCloudProperty = helixCloudProperty; + } + + public void setTransition(MockTransition transition) { + _ofModelFactory.setTrasition(transition); + } + + /** + * This method should be called before syncStart() called after syncStop() + */ + public void reset() { + syncStop(); + _startCountDown = new CountDownLatch(1); + _stopCountDown = new CountDownLatch(1); + _waitStopFinishCountDown = new CountDownLatch(1); + } + + @Override + public void run() { + try { + StateMachineEngine stateMach = getStateMachineEngine(); + stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(), + _ofModelFactory); + connect(); + _startCountDown.countDown(); + + _stopCountDown.await(); + } catch (InterruptedException e) { + String msg = + "participant: " + getInstanceName() + ", " + Thread.currentThread().getName() + + " is interrupted"; + LOG.info(msg); + } catch (Exception e) { + LOG.error("exception running participant-manager", e); + } finally { + _startCountDown.countDown(); + + disconnect(); + _waitStopFinishCountDown.countDown(); + } + } + @Override + public void finalize() { + super.finalize(); + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java new file mode 100644 index 000000000..cf656b29d --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java @@ -0,0 +1,54 @@ +package org.apache.helix.gateway.base.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.participant.statemachine.StateModelFactory; + +// mock master slave state model factory +public class MockOFModelFactory extends StateModelFactory<MockOFStateModel> { + private MockTransition _transition; + + public MockOFModelFactory() { + this(null); + } + + public MockOFModelFactory(MockTransition transition) { + _transition = transition; + } + + public void setTrasition(MockTransition transition) { + _transition = transition; + + // set existing transition + for (String resource : getResourceSet()) { + for (String partition : getPartitionSet(resource)) { + MockOFStateModel stateModel = getStateModel(resource, partition); + stateModel.setTransition(transition); + } + } + } + + @Override + public MockOFStateModel createNewStateModel(String resourceName, String partitionKey) { + MockOFStateModel model = new MockOFStateModel(_transition); + + return model; + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java new file mode 100644 index 000000000..0e0350e1f --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java @@ -0,0 +1,65 @@ +package org.apache.helix.gateway.base.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelInfo; +import org.apache.helix.participant.statemachine.Transition; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +// mock master-slave state model +@StateModelInfo(initialState = "OFFLINE", states = { + "ONLINE", "ERROR" +}) +public class MockOFStateModel extends StateModel { + private static Logger LOG = LoggerFactory.getLogger(MockOFStateModel.class); + + protected MockTransition _transition; + + public MockOFStateModel(MockTransition transition) { + _transition = transition; + } + + public void setTransition(MockTransition transition) { + _transition = transition; + } + + @Transition(to = "*", from = "*") + public void generalTransitionHandle(Message message, NotificationContext context) + throws InterruptedException { + LOG.info(String + .format("Resource %s partition %s becomes %s from %s", message.getResourceName(), + message.getPartitionName(), message.getToState(), message.getFromState())); + if (_transition != null) { + _transition.doTransition(message, context); + } + } + + @Override + public void reset() { + LOG.info("Default MockMSStateModel.reset() invoked"); + if (_transition != null) { + _transition.doReset(); + } + } +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java new file mode 100644 index 000000000..d61ba841d --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java @@ -0,0 +1,41 @@ +package org.apache.helix.gateway.base.statemodel; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MockTransition { + private static Logger LOG = LoggerFactory.getLogger(MockTransition.class); + + // called by state model transition functions + public void doTransition(Message message, NotificationContext context) + throws InterruptedException { + LOG.info("default doTransition() invoked"); + } + + // called by state model reset function + public void doReset() { + LOG.info("default doReset() invoked"); + } + +} diff --git a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java new file mode 100644 index 000000000..681f39205 --- /dev/null +++ b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java @@ -0,0 +1,817 @@ +package org.apache.helix.gateway.base.util; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.io.File; +import java.io.IOException; +import java.lang.reflect.Method; +import java.net.ServerSocket; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeMap; +import java.util.TreeSet; +import java.util.concurrent.Callable; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.TimeUnit; + +import org.apache.commons.io.FileUtils; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixManager; +import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.IdealState.RebalanceMode; +import org.apache.helix.model.Message; +import org.apache.helix.model.Message.MessageType; +import org.apache.helix.model.StateModelDefinition; +import org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty; +import org.apache.helix.store.zk.ZNode; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.util.ZKClientPool; +import org.apache.helix.zookeeper.api.client.HelixZkClient; +import org.apache.helix.zookeeper.api.client.RealmAwareZkClient; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory; +import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace; +import org.apache.helix.zookeeper.zkclient.ZkClient; +import org.apache.helix.zookeeper.zkclient.ZkServer; +import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; +import org.apache.zookeeper.data.Stat; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; + +public class TestHelper { + private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class); + public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds + public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500; + /** + * Returns a unused random port. + */ + public static int getRandomPort() throws IOException { + ServerSocket sock = new ServerSocket(); + sock.bind(null); + int port = sock.getLocalPort(); + sock.close(); + + return port; + } + + static public ZkServer startZkServer(final String zkAddress) throws Exception { + List<String> empty = Collections.emptyList(); + return TestHelper.startZkServer(zkAddress, empty, true); + } + + static public ZkServer startZkServer(final String zkAddress, final String rootNamespace) + throws Exception { + List<String> rootNamespaces = new ArrayList<String>(); + rootNamespaces.add(rootNamespace); + return TestHelper.startZkServer(zkAddress, rootNamespaces, true); + } + + static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces) + throws Exception { + return startZkServer(zkAddress, rootNamespaces, true); + } + + static public ZkServer startZkServer(final String zkAddress, final List<String> rootNamespaces, + boolean overwrite) throws Exception { + System.out.println( + "Start zookeeper at " + zkAddress + " in thread " + Thread.currentThread().getName()); + + String zkDir = zkAddress.replace(':', '_'); + final String logDir = "/tmp/" + zkDir + "/logs"; + final String dataDir = "/tmp/" + zkDir + "/dataDir"; + if (overwrite) { + FileUtils.deleteDirectory(new File(dataDir)); + FileUtils.deleteDirectory(new File(logDir)); + } + ZKClientPool.reset(); + + IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() { + @Override + public void createDefaultNameSpace(ZkClient zkClient) { + if (rootNamespaces == null) { + return; + } + + for (String rootNamespace : rootNamespaces) { + try { + zkClient.deleteRecursive(rootNamespace); + } catch (Exception e) { + LOG.error("fail to deleteRecursive path:" + rootNamespace, e); + } + } + } + }; + + int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':') + 1)); + ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port); + zkServer.start(); + + return zkServer; + } + + static public void stopZkServer(ZkServer zkServer) { + if (zkServer != null) { + zkServer.shutdown(); + System.out.println( + "Shut down zookeeper at port " + zkServer.getPort() + " in thread " + Thread + .currentThread().getName()); + } + } + + public static void setupEmptyCluster(HelixZkClient zkClient, String clusterName) { + ZKHelixAdmin admin = new ZKHelixAdmin(zkClient); + admin.addCluster(clusterName, true); + } + + /** + * convert T[] to set<T> + * @param s + * @return + */ + public static <T> Set<T> setOf(T... s) { + Set<T> set = new HashSet<T>(Arrays.asList(s)); + return set; + } + + /** + * generic method for verification with a timeout + * @param verifierName + * @param args + */ + public static void verifyWithTimeout(String verifierName, long timeout, Object... args) { + final long sleepInterval = 1000; // in ms + final int loop = (int) (timeout / sleepInterval) + 1; + try { + boolean result = false; + int i = 0; + for (; i < loop; i++) { + Thread.sleep(sleepInterval); + // verifier should be static method + result = (Boolean) TestHelper.getMethod(verifierName).invoke(null, args); + + if (result == true) { + break; + } + } + + // debug + // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify (" + // + result + ")"); + System.err.println( + verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " (" + result + ")"); + LOG.debug("args:" + Arrays.toString(args)); + // System.err.println("args:" + Arrays.toString(args)); + + if (result == false) { + LOG.error(verifierName + " fails"); + LOG.error("args:" + Arrays.toString(args)); + } + + Assert.assertTrue(result); + } catch (Exception e) { + LOG.error("Exception in verify: " + verifierName, e); + } + } + + private static Method getMethod(String name) { + Method[] methods = TestHelper.class.getMethods(); + for (Method method : methods) { + if (name.equals(method.getName())) { + return method; + } + } + return null; + } + + public static boolean verifyEmptyCurStateAndExtView(String clusterName, String resourceName, + Set<String> instanceNames, String zkAddr) { + HelixZkClient zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + zkClient.setZkSerializer(new ZNRecordSerializer()); + + try { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + for (String instanceName : instanceNames) { + List<String> sessionIds = accessor.getChildNames(keyBuilder.sessions(instanceName)); + + for (String sessionId : sessionIds) { + CurrentState curState = + accessor.getProperty(keyBuilder.currentState(instanceName, sessionId, resourceName)); + + if (curState != null && curState.getRecord().getMapFields().size() != 0) { + return false; + } + + CurrentState taskCurState = + accessor.getProperty(keyBuilder.taskCurrentState(instanceName, sessionId, resourceName)); + + if (taskCurState != null && taskCurState.getRecord().getMapFields().size() != 0) { + return false; + } + } + + ExternalView extView = accessor.getProperty(keyBuilder.externalView(resourceName)); + + if (extView != null && extView.getRecord().getMapFields().size() != 0) { + return false; + } + } + + return true; + } finally { + zkClient.close(); + } + } + + public static boolean verifyNotConnected(HelixManager manager) { + return !manager.isConnected(); + } + + public static void setupCluster(String clusterName, String zkAddr, int startPort, + String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, + int nodesNb, int replica, String stateModelDef, boolean doRebalance) throws Exception { + TestHelper + .setupCluster(clusterName, zkAddr, startPort, participantNamePrefix, resourceNamePrefix, + resourceNb, partitionNb, nodesNb, replica, stateModelDef, RebalanceMode.SEMI_AUTO, + doRebalance); + } + + public static void setupCluster(String clusterName, String zkAddr, int startPort, + String participantNamePrefix, String resourceNamePrefix, int resourceNb, int partitionNb, + int nodesNb, int replica, String stateModelDef, RebalanceMode mode, boolean doRebalance) { + HelixZkClient zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + try { + if (zkClient.exists("/" + clusterName)) { + LOG.warn("Cluster already exists:" + clusterName + ". Deleting it"); + zkClient.deleteRecursively("/" + clusterName); + } + + ClusterSetup setupTool = new ClusterSetup(zkAddr); + setupTool.addCluster(clusterName, true); + + for (int i = 0; i < nodesNb; i++) { + int port = startPort + i; + setupTool.addInstanceToCluster(clusterName, participantNamePrefix + "_" + port); + } + + for (int i = 0; i < resourceNb; i++) { + String resourceName = resourceNamePrefix + i; + setupTool.addResourceToCluster(clusterName, resourceName, partitionNb, stateModelDef, + mode.name(), + mode == RebalanceMode.FULL_AUTO ? CrushEdRebalanceStrategy.class.getName() + : RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY); + if (doRebalance) { + setupTool.rebalanceStorageCluster(clusterName, resourceName, replica); + } + } + } finally { + zkClient.close(); + } + } + + public static void dropCluster(String clusterName, RealmAwareZkClient zkClient) { + ClusterSetup setupTool = new ClusterSetup(zkClient); + dropCluster(clusterName, zkClient, setupTool); + } + + public static void dropCluster(String clusterName, RealmAwareZkClient zkClient, ClusterSetup setup) { + String namespace = "/" + clusterName; + if (zkClient.exists(namespace)) { + try { + setup.deleteCluster(clusterName); + } catch (Exception ex) { + // Failed to delete, give some more time for connections to drop + try { + Thread.sleep(3000L); + setup.deleteCluster(clusterName); + } catch (Exception ignored) { + // OK - just ignore + } + } + } + } + + /** + * @param stateMap + * : "ResourceName/partitionKey" -> setOf(instances) + * @param state + * : MASTER|SLAVE|ERROR... + */ + public static void verifyState(String clusterName, String zkAddr, + Map<String, Set<String>> stateMap, String state) { + HelixZkClient zkClient = SharedZkClientFactory.getInstance() + .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr)); + zkClient.setZkSerializer(new ZNRecordSerializer()); + + try { + ZKHelixDataAccessor accessor = + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(zkClient)); + Builder keyBuilder = accessor.keyBuilder(); + + for (String resGroupPartitionKey : stateMap.keySet()) { + Map<String, String> retMap = getResourceAndPartitionKey(resGroupPartitionKey); + String resGroup = retMap.get("RESOURCE"); + String partitionKey = retMap.get("PARTITION"); + + ExternalView extView = accessor.getProperty(keyBuilder.externalView(resGroup)); + for (String instance : stateMap.get(resGroupPartitionKey)) { + String actualState = extView.getStateMap(partitionKey).get(instance); + Assert.assertNotNull(actualState, + "externalView doesn't contain state for " + resGroup + "/" + partitionKey + " on " + + instance + " (expect " + state + ")"); + + Assert.assertEquals(actualState, state, + "externalView for " + resGroup + "/" + partitionKey + " on " + instance + " is " + + actualState + " (expect " + state + ")"); + } + } + } finally { + zkClient.close(); + } + } + + /** + * @param resourcePartition + * : key is in form of "resource/partitionKey" or "resource_x" + * @return + */ + private static Map<String, String> getResourceAndPartitionKey(String resourcePartition) { + String resourceName; + String partitionName; + int idx = resourcePartition.indexOf('/'); + if (idx > -1) { + resourceName = resourcePartition.substring(0, idx); + partitionName = resourcePartition.substring(idx + 1); + } else { + idx = resourcePartition.lastIndexOf('_'); + resourceName = resourcePartition.substring(0, idx); + partitionName = resourcePartition; + } + + Map<String, String> retMap = new HashMap<String, String>(); + retMap.put("RESOURCE", resourceName); + retMap.put("PARTITION", partitionName); + return retMap; + } + + public static <T> Map<String, T> startThreadsConcurrently(final int nrThreads, + final Callable<T> method, final long timeout) { + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch finishCounter = new CountDownLatch(nrThreads); + final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>(); + final List<Thread> threadList = new ArrayList<Thread>(); + + for (int i = 0; i < nrThreads; i++) { + Thread thread = new Thread() { + @Override + public void run() { + try { + boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS); + if (isTimeout) { + LOG.error("Timeout while waiting for start latch"); + } + } catch (InterruptedException ex) { + LOG.error("Interrupted while waiting for start latch"); + } + + try { + T result = method.call(); + if (result != null) { + resultsMap.put("thread_" + this.getId(), result); + } + LOG.debug("result=" + result); + } catch (Exception e) { + LOG.error("Exeption in executing " + method.getClass().getName(), e); + } + + finishCounter.countDown(); + } + }; + threadList.add(thread); + thread.start(); + } + startLatch.countDown(); + + // wait for all thread to complete + try { + boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS); + if (isTimeout) { + LOG.error("Timeout while waiting for finish latch. Interrupt all threads"); + for (Thread thread : threadList) { + thread.interrupt(); + } + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for finish latch", e); + } + + return resultsMap; + } + + public static Message createMessage(String msgId, String fromState, String toState, + String tgtName, String resourceName, String partitionName) { + Message msg = new Message(MessageType.STATE_TRANSITION, msgId); + msg.setFromState(fromState); + msg.setToState(toState); + msg.setTgtName(tgtName); + msg.setResourceName(resourceName); + msg.setPartitionName(partitionName); + msg.setStateModelDef("MasterSlave"); + + return msg; + } + + public static String getTestMethodName() { + StackTraceElement[] calls = Thread.currentThread().getStackTrace(); + return calls[2].getMethodName(); + } + + public static String getTestClassName() { + StackTraceElement[] calls = Thread.currentThread().getStackTrace(); + String fullClassName = calls[2].getClassName(); + return fullClassName.substring(fullClassName.lastIndexOf('.') + 1); + } + + public static <T> Map<String, T> startThreadsConcurrently(final List<Callable<T>> methods, + final long timeout) { + final int nrThreads = methods.size(); + final CountDownLatch startLatch = new CountDownLatch(1); + final CountDownLatch finishCounter = new CountDownLatch(nrThreads); + final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>(); + final List<Thread> threadList = new ArrayList<Thread>(); + + for (int i = 0; i < nrThreads; i++) { + final Callable<T> method = methods.get(i); + + Thread thread = new Thread() { + @Override + public void run() { + try { + boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS); + if (isTimeout) { + LOG.error("Timeout while waiting for start latch"); + } + } catch (InterruptedException ex) { + LOG.error("Interrupted while waiting for start latch"); + } + + try { + T result = method.call(); + if (result != null) { + resultsMap.put("thread_" + this.getId(), result); + } + LOG.debug("result=" + result); + } catch (Exception e) { + LOG.error("Exeption in executing " + method.getClass().getName(), e); + } + + finishCounter.countDown(); + } + }; + threadList.add(thread); + thread.start(); + } + startLatch.countDown(); + + // wait for all thread to complete + try { + boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS); + if (isTimeout) { + LOG.error("Timeout while waiting for finish latch. Interrupt all threads"); + for (Thread thread : threadList) { + thread.interrupt(); + } + } + } catch (InterruptedException e) { + LOG.error("Interrupted while waiting for finish latch", e); + } + + return resultsMap; + } + + public static void printCache(Map<String, ZNode> cache) { + System.out.println("START:Print cache"); + TreeMap<String, ZNode> map = new TreeMap<String, ZNode>(); + map.putAll(cache); + + for (String key : map.keySet()) { + ZNode node = map.get(key); + TreeSet<String> childSet = new TreeSet<String>(); + childSet.addAll(node.getChildSet()); + System.out.print( + key + "=" + node.getData() + ", " + childSet + ", " + (node.getStat() == null ? "null\n" + : node.getStat())); + } + System.out.println("END:Print cache"); + } + + public static void readZkRecursive(String path, Map<String, ZNode> map, HelixZkClient zkclient) { + try { + Stat stat = new Stat(); + ZNRecord record = zkclient.readData(path, stat); + List<String> childNames = zkclient.getChildren(path); + ZNode node = new ZNode(path, record, stat); + node.addChildren(childNames); + map.put(path, node); + + for (String childName : childNames) { + String childPath = path + "/" + childName; + readZkRecursive(childPath, map, zkclient); + } + } catch (ZkNoNodeException e) { + // OK + } + } + + public static void readZkRecursive(String path, Map<String, ZNode> map, + BaseDataAccessor<ZNRecord> zkAccessor) { + try { + Stat stat = new Stat(); + ZNRecord record = zkAccessor.get(path, stat, 0); + List<String> childNames = zkAccessor.getChildNames(path, 0); + // System.out.println("childNames: " + childNames); + ZNode node = new ZNode(path, record, stat); + node.addChildren(childNames); + map.put(path, node); + + if (childNames != null && !childNames.isEmpty()) { + for (String childName : childNames) { + String childPath = path + "/" + childName; + readZkRecursive(childPath, map, zkAccessor); + } + } + } catch (ZkNoNodeException e) { + // OK + } + } + + public static boolean verifyZkCache(List<String> paths, BaseDataAccessor<ZNRecord> zkAccessor, + HelixZkClient zkclient, boolean needVerifyStat) { + // read everything + Map<String, ZNode> zkMap = new HashMap<String, ZNode>(); + Map<String, ZNode> cache = new HashMap<String, ZNode>(); + for (String path : paths) { + readZkRecursive(path, zkMap, zkclient); + readZkRecursive(path, cache, zkAccessor); + } + // printCache(map); + + return verifyZkCache(paths, null, cache, zkMap, needVerifyStat); + } + + public static boolean verifyZkCache(List<String> paths, Map<String, ZNode> cache, + HelixZkClient zkclient, boolean needVerifyStat) { + return verifyZkCache(paths, null, cache, zkclient, needVerifyStat); + } + + public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, + Map<String, ZNode> cache, HelixZkClient zkclient, boolean needVerifyStat) { + // read everything on zk under paths + Map<String, ZNode> zkMap = new HashMap<String, ZNode>(); + for (String path : paths) { + readZkRecursive(path, zkMap, zkclient); + } + // printCache(map); + + return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap, needVerifyStat); + } + + public static boolean verifyZkCache(List<String> paths, List<String> pathsExcludeForStat, + Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean needVerifyStat) { + // equal size + if (zkMap.size() != cache.size()) { + System.err + .println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize: " + zkMap.size()); + System.out.println("cache: (" + cache.size() + ")"); + TestHelper.printCache(cache); + + System.out.println("zkMap: (" + zkMap.size() + ")"); + TestHelper.printCache(zkMap); + + return false; + } + + // everything in cache is also in map + for (String path : cache.keySet()) { + ZNode cacheNode = cache.get(path); + ZNode zkNode = zkMap.get(path); + + if (zkNode == null) { + // in cache but not on zk + System.err.println("path: " + path + " in cache but not on zk: inCacheNode: " + cacheNode); + return false; + } + + if ((zkNode.getData() == null && cacheNode.getData() != null) || (zkNode.getData() != null + && cacheNode.getData() == null) || (zkNode.getData() != null + && cacheNode.getData() != null && !zkNode.getData().equals(cacheNode.getData()))) { + // data not equal + System.err.println( + "data mismatch on path: " + path + ", inCache: " + cacheNode.getData() + ", onZk: " + + zkNode.getData()); + return false; + } + + if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) || ( + zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || ( + zkNode.getChildSet() != null && cacheNode.getChildSet() != null && !zkNode.getChildSet() + .equals(cacheNode.getChildSet()))) { + // childSet not equal + System.err.println( + "childSet mismatch on path: " + path + ", inCache: " + cacheNode.getChildSet() + + ", onZk: " + zkNode.getChildSet()); + return false; + } + + if (needVerifyStat && pathsExcludeForStat != null && !pathsExcludeForStat.contains(path)) { + if (cacheNode.getStat() == null || !zkNode.getStat().equals(cacheNode.getStat())) { + // stat not equal + System.err.println( + "Stat mismatch on path: " + path + ", inCache: " + cacheNode.getStat() + ", onZk: " + + zkNode.getStat()); + return false; + } + } + } + + return true; + } + + public static StateModelDefinition generateStateModelDefForBootstrap() { + ZNRecord record = new ZNRecord("Bootstrap"); + record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(), "IDLE"); + List<String> statePriorityList = new ArrayList<String>(); + statePriorityList.add("ONLINE"); + statePriorityList.add("BOOTSTRAP"); + statePriorityList.add("OFFLINE"); + statePriorityList.add("IDLE"); + statePriorityList.add("DROPPED"); + statePriorityList.add("ERROR"); + record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(), + statePriorityList); + for (String state : statePriorityList) { + String key = state + ".meta"; + Map<String, String> metadata = new HashMap<String, String>(); + if (state.equals("ONLINE")) { + metadata.put("count", StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS); + record.setMapField(key, metadata); + } else if (state.equals("BOOTSTRAP")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("OFFLINE")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("IDLE")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("DROPPED")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } else if (state.equals("ERROR")) { + metadata.put("count", "-1"); + record.setMapField(key, metadata); + } + } + + for (String state : statePriorityList) { + String key = state + ".next"; + if (state.equals("ONLINE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("BOOTSTRAP", "OFFLINE"); + metadata.put("OFFLINE", "OFFLINE"); + metadata.put("DROPPED", "OFFLINE"); + metadata.put("IDLE", "OFFLINE"); + record.setMapField(key, metadata); + } else if (state.equals("BOOTSTRAP")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("ONLINE", "ONLINE"); + metadata.put("OFFLINE", "OFFLINE"); + metadata.put("DROPPED", "OFFLINE"); + metadata.put("IDLE", "OFFLINE"); + record.setMapField(key, metadata); + } else if (state.equals("OFFLINE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("ONLINE", "BOOTSTRAP"); + metadata.put("BOOTSTRAP", "BOOTSTRAP"); + metadata.put("DROPPED", "IDLE"); + metadata.put("IDLE", "IDLE"); + record.setMapField(key, metadata); + } else if (state.equals("IDLE")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("ONLINE", "OFFLINE"); + metadata.put("BOOTSTRAP", "OFFLINE"); + metadata.put("OFFLINE", "OFFLINE"); + metadata.put("DROPPED", "DROPPED"); + record.setMapField(key, metadata); + } else if (state.equals("ERROR")) { + Map<String, String> metadata = new HashMap<String, String>(); + metadata.put("IDLE", "IDLE"); + record.setMapField(key, metadata); + } + } + List<String> stateTransitionPriorityList = new ArrayList<String>(); + stateTransitionPriorityList.add("ONLINE-OFFLINE"); + stateTransitionPriorityList.add("BOOTSTRAP-ONLINE"); + stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP"); + stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE"); + stateTransitionPriorityList.add("OFFLINE-IDLE"); + stateTransitionPriorityList.add("IDLE-OFFLINE"); + stateTransitionPriorityList.add("IDLE-DROPPED"); + stateTransitionPriorityList.add("ERROR-IDLE"); + record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(), + stateTransitionPriorityList); + return new StateModelDefinition(record); + } + + public static String znrecordToString(ZNRecord record) { + StringBuffer sb = new StringBuffer(); + sb.append(record.getId() + "\n"); + Map<String, String> simpleFields = record.getSimpleFields(); + if (simpleFields != null) { + sb.append("simpleFields\n"); + for (String key : simpleFields.keySet()) { + sb.append(" " + key + "\t: " + simpleFields.get(key) + "\n"); + } + } + + Map<String, List<String>> listFields = record.getListFields(); + sb.append("listFields\n"); + for (String key : listFields.keySet()) { + List<String> list = listFields.get(key); + sb.append(" " + key + "\t: "); + for (String listValue : list) { + sb.append(listValue + ", "); + } + sb.append("\n"); + } + + Map<String, Map<String, String>> mapFields = record.getMapFields(); + sb.append("mapFields\n"); + for (String key : mapFields.keySet()) { + Map<String, String> map = mapFields.get(key); + sb.append(" " + key + "\t: \n"); + for (String mapKey : map.keySet()) { + sb.append(" " + mapKey + "\t: " + map.get(mapKey) + "\n"); + } + } + + return sb.toString(); + } + + public interface Verifier { + boolean verify() throws Exception; + } + + public static boolean verify(Verifier verifier, long timeout) throws Exception { + long start = System.currentTimeMillis(); + do { + boolean result = verifier.verify(); + boolean isTimedout = (System.currentTimeMillis() - start) > timeout; + if (result || isTimedout) { + if (isTimedout && !result) { + LOG.error("verifier time out, consider try longer timeout, stack trace{}", + Arrays.asList(Thread.currentThread().getStackTrace())); + } + return result; + } + Thread.sleep(50); + } while (true); + } +}