This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch helix-gateway-service
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/helix-gateway-service by this
push:
new 98138c4f5 Build HelixGateway integration test base (#2842)
98138c4f5 is described below
commit 98138c4f5a52dd35ee00c79b011742758e4be8f9
Author: Junkai Xue <[email protected]>
AuthorDate: Mon Jul 22 13:27:43 2024 -0700
Build HelixGateway integration test base (#2842)
This test base contains all the setups including:
1. Mock participants which hybrid with gateway services.
2. Controller with start/stop.
3. Resource creation.
4. Only Online/Offline state model.
5. Gateway services with start/stop (now leave empty until gateway service
is implemented).
Also start a dummy test to extend test base to make sure all set ups are
ready except gateway services.
---
helix-gateway/src/test/conf/testng.xml | 2 +-
.../java/org/apache/helix/gateway/DummyTest.java | 21 +
.../helix/gateway/base/HelixGatewayTestBase.java | 141 ++++
.../helix/gateway/base/ZookeeperTestBase.java | 814 ++++++++++++++++++++
.../base/manager/ClusterControllerManager.java | 49 ++
.../helix/gateway/base/manager/ClusterManager.java | 135 ++++
.../base/manager/MockParticipantManager.java | 118 +++
.../base/statemodel/MockOFModelFactory.java | 54 ++
.../gateway/base/statemodel/MockOFStateModel.java | 65 ++
.../gateway/base/statemodel/MockTransition.java | 41 ++
.../apache/helix/gateway/base/util/TestHelper.java | 817 +++++++++++++++++++++
11 files changed, 2256 insertions(+), 1 deletion(-)
diff --git a/helix-gateway/src/test/conf/testng.xml
b/helix-gateway/src/test/conf/testng.xml
index 19446d49b..f77eab885 100644
--- a/helix-gateway/src/test/conf/testng.xml
+++ b/helix-gateway/src/test/conf/testng.xml
@@ -21,7 +21,7 @@
<suite name="Suite" parallel="false">
<test name="Test" preserve-order="true">
<packages>
- <package name="org.apache.helix.helix.gateway.*"/>
+ <package name="org.apache.helix.gateway.*"/>
</packages>
</test>
</suite>
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
new file mode 100644
index 000000000..7511cb3a3
--- /dev/null
+++ b/helix-gateway/src/test/java/org/apache/helix/gateway/DummyTest.java
@@ -0,0 +1,21 @@
+package org.apache.helix.gateway;
+
+import org.apache.helix.gateway.base.HelixGatewayTestBase;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class DummyTest extends HelixGatewayTestBase {
+
+ @BeforeClass
+ public void beforeClass() {
+ _numParticipants = 5;
+ super.beforeClass();
+ }
+
+ @Test
+ public void testSetups() {
+ createResource("TestDB_1", 4, 2);
+ Assert.assertTrue(_clusterVerifier.verify());
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
new file mode 100644
index 000000000..06cf45cd1
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/HelixGatewayTestBase.java
@@ -0,0 +1,141 @@
+package org.apache.helix.gateway.base;
+
+import java.util.List;
+
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.gateway.base.manager.ClusterControllerManager;
+import org.apache.helix.gateway.base.manager.MockParticipantManager;
+import org.apache.helix.gateway.base.util.TestHelper;
+import
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.collections.Lists;
+
+public class HelixGatewayTestBase extends ZookeeperTestBase {
+ protected static final int START_PORT = 12918;
+ protected String _clusterName = "TEST_CLUSTER"; // change the cluster name
for each test class
+ protected int _numParticipants = 3;
+ protected int _numGatewayInstances = 3;
+ protected ClusterControllerManager _controller;
+ protected List<MockParticipantManager> _participants;
+ protected ConfigAccessor _configAccessor;
+ protected BestPossibleExternalViewVerifier _clusterVerifier;
+
+ @BeforeClass
+ public void beforeClass() {
+ _participants = Lists.newArrayList();
+ _configAccessor = new ConfigAccessor(ZK_ADDR);
+ _gSetupTool.getClusterManagementTool().addCluster(_clusterName, true);
+ controllerManagement(true);
+ startParticipants();
+ startGatewayService();
+ }
+
+ @AfterClass
+ public void afterClass() {
+ controllerManagement(false);
+ stopParticipants(true);
+ stopGatewayService();
+ _gSetupTool.getClusterManagementTool().dropCluster(_clusterName);
+ }
+
+
+ /**
+ * Start or stop the controller
+ * @param start true to start the controller, false to stop the controller
+ */
+ private void controllerManagement(boolean start) {
+ String controllerName = CONTROLLER_PREFIX + "_0";
+
+ if (start) {
+ _controller = new ClusterControllerManager(ZK_ADDR, _clusterName,
controllerName);
+ _controller.syncStart();
+
+ _clusterVerifier =
+ new
BestPossibleExternalViewVerifier.Builder(_clusterName).setZkClient(_gZkClient).setWaitTillVerify(
+ TestHelper.DEFAULT_REBALANCE_PROCESSING_WAIT_TIME).build();
+ } else {
+ _controller.syncStop();
+ }
+
+ enablePersistBestPossibleAssignment(_gZkClient, _clusterName, start);
+ }
+
+ /**
+ * Create participants with the given number of participants defined by
_numParticipants
+ */
+ private void startParticipants() {
+ for (int i = 0; i < _numParticipants; i++) {
+ String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i);
+ _gSetupTool.addInstanceToCluster(_clusterName, storageNodeName);
+
+ // start dummy participants
+ MockParticipantManager participant =
+ new MockParticipantManager(ZK_ADDR, _clusterName, storageNodeName);
+ participant.syncStart();
+ _participants.add(participant);
+ }
+ }
+
+ /**
+ * Stop participants and optionally drop the participants
+ * if dropParticipants is true
+ *
+ * @param dropParticipants true to drop the participants, false to stop the
participants
+ */
+ private void stopParticipants(boolean dropParticipants) {
+ for (MockParticipantManager participant : _participants) {
+ participant.syncStop();
+ if (dropParticipants) {
+ _gSetupTool.getClusterManagementTool().dropInstance(_clusterName,
+ _configAccessor.getInstanceConfig(_clusterName,
participant.getInstanceName()));
+ }
+ }
+ _participants.clear();
+ }
+
+ /**
+ * Create a resource with the given number of partitions and replicas
+ * WARNING: 1) assume only support OnlineOffline state model
+ * 2) assume only support FULL_AUTO rebalance mode
+ *
+ * Default rebalance strategy is CrushEdRebalanceStrategy
+ *
+ * @param resourceName name of the resource
+ * @param numPartitions number of partitions
+ * @param numReplicas number of replicas
+ */
+
+ protected void createResource(String resourceName, int numPartitions, int
numReplicas) {
+ createResource(resourceName, numPartitions, numReplicas,
"org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy");}
+
+ /**
+ * Create a resource with the given number of partitions, replicas and
rebalance strategy
+ *
+ * @param resourceName name of the resource
+ * @param numPartitions number of partitions
+ * @param numReplicas number of replicas
+ * @param rebalanceStrategy rebalance strategy
+ */
+ protected void createResource(String resourceName, int numPartitions, int
numReplicas, String rebalanceStrategy) {
+ _gSetupTool.getClusterManagementTool().addResource(_clusterName,
resourceName, numPartitions, "OnlineOffline",
+ "FULL_AUTO", rebalanceStrategy);
+ _gSetupTool.getClusterManagementTool().rebalance(_clusterName,
resourceName, numReplicas);
+ }
+
+ /**
+ * Start the gateway service with the given number of gateway instances
+ * defined by _numGatewayInstances
+ */
+ protected void startGatewayService() {
+ // Start the gateway service
+ }
+
+ /**
+ * Stop the gateway service
+ */
+ protected void stopGatewayService() {
+ // Stop the gateway service
+
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
new file mode 100644
index 000000000..8d87bc4f3
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/ZookeeperTestBase.java
@@ -0,0 +1,814 @@
+package org.apache.helix.gateway.base;
+
+import java.io.IOException;
+import java.lang.management.ManagementFactory;
+import java.lang.reflect.Method;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.logging.Level;
+import javax.management.MBeanServerConnection;
+import javax.management.ObjectName;
+
+import com.google.common.base.Preconditions;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.ConfigAccessor;
+import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixProperty;
+import org.apache.helix.PropertyKey;
+import org.apache.helix.PropertyPathBuilder;
+import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.api.config.HelixConfigProperty;
+import org.apache.helix.constants.InstanceConstants;
+import org.apache.helix.controller.pipeline.AbstractAsyncBaseStage;
+import org.apache.helix.controller.pipeline.Pipeline;
+import org.apache.helix.controller.pipeline.Stage;
+import org.apache.helix.controller.pipeline.StageContext;
+import org.apache.helix.controller.rebalancer.DelayedAutoRebalancer;
+import org.apache.helix.controller.rebalancer.strategy.AutoRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.waged.WagedRebalancer;
+import org.apache.helix.controller.stages.AttributeName;
+import org.apache.helix.controller.stages.ClusterEvent;
+import org.apache.helix.gateway.base.util.TestHelper;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.ClusterConfig;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.OnlineOfflineSMD;
+import org.apache.helix.model.ResourceConfig;
+import org.apache.helix.model.StateModelDefinition;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.tools.ClusterStateVerifier;
+import org.apache.helix.tools.StateModelConfigGenerator;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.client.ZkClient;
+import org.apache.helix.zookeeper.impl.factory.DedicatedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.ITestContext;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.AfterSuite;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.BeforeSuite;
+
+public abstract class ZookeeperTestBase {
+ private static final Logger LOG =
LoggerFactory.getLogger(ZookeeperTestBase.class);
+ private static final String MULTI_ZK_PROPERTY_KEY = "multiZk";
+ private static final String NUM_ZK_PROPERTY_KEY = "numZk";
+
+ protected static ZkServer _zkServer;
+ protected static HelixZkClient _gZkClient;
+ protected static ClusterSetup _gSetupTool;
+ protected static BaseDataAccessor<ZNRecord> _baseAccessor;
+ protected static MBeanServerConnection _server =
ManagementFactory.getPlatformMBeanServer();
+
+ private final Map<String, Map<String, HelixZkClient>> _liveInstanceOwners =
new HashMap<>();
+
+ private static final String ZK_PREFIX = "localhost:";
+ private static final int ZK_START_PORT = 2283;
+ public static final String ZK_ADDR = ZK_PREFIX + ZK_START_PORT;
+ protected static final String CLUSTER_PREFIX = "CLUSTER";
+ protected static final String CONTROLLER_CLUSTER_PREFIX =
"CONTROLLER_CLUSTER";
+ protected final String CONTROLLER_PREFIX = "controller";
+ protected final String PARTICIPANT_PREFIX = "localhost";
+ private static final long MANUAL_GC_PAUSE = 4000L;
+
+ /*
+ * Multiple ZK references
+ */
+ // The following maps hold ZK connect string as keys
+ protected static final Map<String, ZkServer> _zkServerMap = new HashMap<>();
+ protected static final Map<String, HelixZkClient> _helixZkClientMap = new
HashMap<>();
+ protected static final Map<String, ClusterSetup> _clusterSetupMap = new
HashMap<>();
+ protected static final Map<String, BaseDataAccessor> _baseDataAccessorMap =
new HashMap<>();
+
+ static public void reportPhysicalMemory() {
+ com.sun.management.OperatingSystemMXBean os =
(com.sun.management.OperatingSystemMXBean)
+ java.lang.management.ManagementFactory.getOperatingSystemMXBean();
+ long physicalMemorySize = os.getTotalPhysicalMemorySize();
+ System.out.println("************ SYSTEM Physical Memory:" +
physicalMemorySize);
+
+ long MB = 1024 * 1024;
+ Runtime runtime = Runtime.getRuntime();
+ long free = runtime.freeMemory()/MB;
+ long total = runtime.totalMemory()/MB;
+ System.out.println("************ total memory:" + total + " free memory:"
+ free);
+ }
+
+ @BeforeSuite
+ public void beforeSuite() throws Exception {
+ // TODO: use logging.properties file to config java.util.logging.Logger
levels
+ java.util.logging.Logger topJavaLogger =
java.util.logging.Logger.getLogger("");
+ topJavaLogger.setLevel(Level.WARNING);
+
+ // Due to ZOOKEEPER-2693 fix, we need to specify whitelist for execute zk
commends
+ System.setProperty("zookeeper.4lw.commands.whitelist", "*");
+ System.setProperty(SystemPropertyKeys.CONTROLLER_MESSAGE_PURGE_DELAY,
"3000");
+
+ // Start in-memory ZooKeepers
+ // If multi-ZooKeeper is enabled, start more ZKs. Otherwise, just set up
one ZK
+ int numZkToStart = 1;
+ String multiZkConfig = System.getProperty(MULTI_ZK_PROPERTY_KEY);
+ if (multiZkConfig != null &&
multiZkConfig.equalsIgnoreCase(Boolean.TRUE.toString())) {
+ String numZkFromConfig = System.getProperty(NUM_ZK_PROPERTY_KEY);
+ if (numZkFromConfig != null) {
+ try {
+ numZkToStart = Math.max(Integer.parseInt(numZkFromConfig),
numZkToStart);
+ } catch (Exception e) {
+ Assert.fail("Failed to parse the number of ZKs from config!");
+ }
+ } else {
+ Assert.fail("multiZk config is set but numZk config is missing!");
+ }
+ }
+
+ // Start "numZkFromConfigInt" ZooKeepers
+ for (int i = 0; i < numZkToStart; i++) {
+ startZooKeeper(i);
+ }
+
+ // Set the references for backward-compatibility with a single ZK
environment
+ _zkServer = _zkServerMap.get(ZK_ADDR);
+ _gZkClient = _helixZkClientMap.get(ZK_ADDR);
+ _gSetupTool = _clusterSetupMap.get(ZK_ADDR);
+ _baseAccessor = _baseDataAccessorMap.get(ZK_ADDR);
+
+ // Clean up all JMX objects
+ for (ObjectName mbean : _server.queryNames(null, null)) {
+ try {
+ _server.unregisterMBean(mbean);
+ } catch (Exception e) {
+ // OK
+ }
+ }
+ }
+
+ /**
+ * Starts an additional in-memory ZooKeeper for testing.
+ * @param i index to be added to the ZK port to avoid conflicts
+ * @throws Exception
+ */
+ private static synchronized void startZooKeeper(int i) {
+ String zkAddress = ZK_PREFIX + (ZK_START_PORT + i);
+ _zkServerMap.computeIfAbsent(zkAddress,
ZookeeperTestBase::createZookeeperServer);
+ _helixZkClientMap.computeIfAbsent(zkAddress,
ZookeeperTestBase::createZkClient);
+ _clusterSetupMap.computeIfAbsent(zkAddress, key -> new
ClusterSetup(_helixZkClientMap.get(key)));
+ _baseDataAccessorMap.computeIfAbsent(zkAddress, key -> new
ZkBaseDataAccessor(_helixZkClientMap.get(key)));
+ }
+
+ private static ZkServer createZookeeperServer(String zkAddress) {
+ try {
+ return Preconditions.checkNotNull(TestHelper.startZkServer(zkAddress));
+ } catch (Exception e) {
+ throw new IllegalArgumentException("Failed to start zookeeper server at
" + zkAddress, e);
+ }
+ }
+
+ private static HelixZkClient createZkClient(String zkAddress) {
+ HelixZkClient.ZkClientConfig clientConfig = new
HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+ return DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddress),
clientConfig);
+ }
+
+ @AfterSuite
+ public void afterSuite() throws IOException {
+ // Clean up all JMX objects
+ for (ObjectName mbean : _server.queryNames(null, null)) {
+ try {
+ _server.unregisterMBean(mbean);
+ } catch (Exception e) {
+ // OK
+ }
+ }
+
+ synchronized (ZookeeperTestBase.class) {
+ // Close all ZK resources
+ _baseDataAccessorMap.values().forEach(BaseDataAccessor::close);
+ _clusterSetupMap.values().forEach(ClusterSetup::close);
+ _helixZkClientMap.values().forEach(HelixZkClient::close);
+ _zkServerMap.values().forEach(TestHelper::stopZkServer);
+ }
+ }
+
+ @BeforeClass
+ public void beforeClass() throws Exception {
+ cleanupJMXObjects();
+ // Giving each test some time to settle (such as gc pause, etc).
+ // Note that this is the best effort we could make to stabilize tests, not
a complete solution
+ Runtime.getRuntime().gc();
+ Thread.sleep(MANUAL_GC_PAUSE);
+ }
+
+ @BeforeMethod
+ public void beforeTest(Method testMethod, ITestContext testContext) {
+ testContext.setAttribute("StartTime", System.currentTimeMillis());
+ }
+
+ protected void cleanupJMXObjects() throws IOException {
+ // Clean up all JMX objects
+ for (ObjectName mbean : _server.queryNames(null, null)) {
+ try {
+ _server.unregisterMBean(mbean);
+ } catch (Exception e) {
+ // OK
+ }
+ }
+ }
+
+ protected String getShortClassName() {
+ return this.getClass().getSimpleName();
+ }
+
+ protected String getCurrentLeader(HelixZkClient zkClient, String
clusterName) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance leader = accessor.getProperty(keyBuilder.controllerLeader());
+ if (leader == null) {
+ return null;
+ }
+ return leader.getInstanceName();
+ }
+
+ protected void enablePersistBestPossibleAssignment(HelixZkClient zkClient,
String clusterName,
+ Boolean enabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setPersistBestPossibleAssignment(enabled);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void enablePersistIntermediateAssignment(HelixZkClient zkClient,
String clusterName,
+ Boolean enabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setPersistIntermediateAssignment(enabled);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void enableTopologyAwareRebalance(HelixZkClient zkClient, String
clusterName,
+ Boolean enabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setTopologyAwareEnabled(enabled);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String
clusterName,
+ boolean enabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setDelayRebalaceEnabled(enabled);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void enableDelayRebalanceInInstance(HelixZkClient zkClient, String
clusterName,
+ String instanceName, boolean enabled) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ InstanceConfig instanceConfig =
configAccessor.getInstanceConfig(clusterName, instanceName);
+ instanceConfig.setDelayRebalanceEnabled(enabled);
+ configAccessor.setInstanceConfig(clusterName, instanceName,
instanceConfig);
+ }
+
+ protected void enableDelayRebalanceInCluster(HelixZkClient zkClient, String
clusterName,
+ boolean enabled, long delay) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setDelayRebalaceEnabled(enabled);
+ clusterConfig.setRebalanceDelayTime(delay);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void enableP2PInCluster(String clusterName, ConfigAccessor
configAccessor,
+ boolean enable) {
+ // enable p2p message in cluster.
+ if (enable) {
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(clusterName);
+ clusterConfig.enableP2PMessage(true);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ } else {
+ ClusterConfig clusterConfig =
configAccessor.getClusterConfig(clusterName);
+ clusterConfig.getRecord().getSimpleFields()
+ .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+ }
+
+ protected void enableP2PInResource(String clusterName, ConfigAccessor
configAccessor,
+ String dbName, boolean enable) {
+ if (enable) {
+ ResourceConfig resourceConfig =
+ new
ResourceConfig.Builder(dbName).setP2PMessageEnabled(true).build();
+ configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+ } else {
+ // remove P2P Message in resource config
+ ResourceConfig resourceConfig =
configAccessor.getResourceConfig(clusterName, dbName);
+ if (resourceConfig != null) {
+ resourceConfig.getRecord().getSimpleFields()
+ .remove(HelixConfigProperty.P2P_MESSAGE_ENABLED.name());
+ configAccessor.setResourceConfig(clusterName, dbName, resourceConfig);
+ }
+ }
+ }
+
+ protected void setDelayTimeInCluster(HelixZkClient zkClient, String
clusterName, long delay) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setRebalanceDelayTime(delay);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected void setLastOnDemandRebalanceTimeInCluster(HelixZkClient zkClient,
+ String clusterName, long lastOnDemandTime) {
+ ConfigAccessor configAccessor = new ConfigAccessor(zkClient);
+ ClusterConfig clusterConfig = configAccessor.getClusterConfig(clusterName);
+ clusterConfig.setLastOnDemandRebalanceTimestamp(lastOnDemandTime);
+ configAccessor.setClusterConfig(clusterName, clusterConfig);
+ }
+
+ protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica,
long delay) {
+ return createResourceWithDelayedRebalance(clusterName, db, stateModel,
numPartition, replica,
+ minActiveReplica, delay, AutoRebalanceStrategy.class.getName());
+ }
+
+ protected IdealState createResourceWithDelayedRebalance(String clusterName,
String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica,
long delay,
+ String rebalanceStrategy) {
+ return createResource(clusterName, db, stateModel, numPartition, replica,
minActiveReplica,
+ delay, DelayedAutoRebalancer.class.getName(), rebalanceStrategy);
+ }
+
+ protected IdealState createResourceWithWagedRebalance(String clusterName,
String db,
+ String stateModel, int numPartition, int replica, int minActiveReplica) {
+ return createResource(clusterName, db, stateModel, numPartition, replica,
minActiveReplica,
+ -1, WagedRebalancer.class.getName(), null);
+ }
+
+ private IdealState createResource(String clusterName, String db, String
stateModel,
+ int numPartition, int replica, int minActiveReplica, long delay, String
rebalancerClassName,
+ String rebalanceStrategy) {
+ IdealState idealState =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+ if (idealState == null) {
+ _gSetupTool.addResourceToCluster(clusterName, db, numPartition,
stateModel,
+ IdealState.RebalanceMode.FULL_AUTO + "", rebalanceStrategy);
+ }
+
+ idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+ idealState.setMinActiveReplicas(minActiveReplica);
+ if (!idealState.isDelayRebalanceEnabled()) {
+ idealState.setDelayRebalanceEnabled(true);
+ }
+ if (delay > 0) {
+ idealState.setRebalanceDelay(delay);
+ }
+ idealState.setRebalancerClassName(rebalancerClassName);
+ _gSetupTool.getClusterManagementTool().setResourceIdealState(clusterName,
db, idealState);
+ _gSetupTool.rebalanceStorageCluster(clusterName, db, replica);
+ idealState =
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, db);
+
+ return idealState;
+ }
+
+ protected IdealState createIdealState(String resourceGroupName, String
instanceGroupTag,
+ List<String> instanceNames, int numPartition, int replica, String
rebalanceMode,
+ String stateModelDef) {
+ IdealState is =
_gSetupTool.createIdealStateForResourceGroup(resourceGroupName,
+ instanceGroupTag, numPartition, replica, rebalanceMode, stateModelDef);
+
+ // setup initial partition->instance mapping.
+ int nodeIdx = 0;
+ int numNode = instanceNames.size();
+ assert (numNode >= replica);
+ for (int i = 0; i < numPartition; i++) {
+ String partitionName = resourceGroupName + "_" + i;
+ for (int j = 0; j < replica; j++) {
+ is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) %
numNode),
+ OnlineOfflineSMD.States.ONLINE.toString());
+ }
+ nodeIdx++;
+ }
+
+ return is;
+ }
+
+ protected void createDBInSemiAuto(ClusterSetup clusterSetup, String
clusterName, String dbName,
+ List<String> preferenceList, String stateModelDef, int numPartition, int
replica) {
+ clusterSetup.addResourceToCluster(clusterName, dbName, numPartition,
stateModelDef,
+ IdealState.RebalanceMode.SEMI_AUTO.toString());
+ clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica);
+
+ IdealState is =
+
_gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName,
dbName);
+ for (String p : is.getPartitionSet()) {
+ is.setPreferenceList(p, preferenceList);
+ }
+ clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName,
dbName, is);
+ }
+
+ /**
+ * Validate there should be always minimal active replica and top state
replica for each
+ * partition.
+ * Also make sure there is always some partitions with only active replica
count.
+ */
+ protected void validateMinActiveAndTopStateReplica(IdealState is,
ExternalView ev,
+ int minActiveReplica, int numNodes) {
+ StateModelDefinition stateModelDef =
+
BuiltInStateModelDefinitions.valueOf(is.getStateModelDefRef()).getStateModelDefinition();
+ String topState = stateModelDef.getStatesPriorityList().get(0);
+ int replica = Integer.valueOf(is.getReplicas());
+
+ Map<String, Integer> stateCount = stateModelDef.getStateCountMap(numNodes,
replica);
+ Set<String> activeStates = stateCount.keySet();
+
+ for (String partition : is.getPartitionSet()) {
+ Map<String, String> assignmentMap =
ev.getRecord().getMapField(partition);
+ Assert.assertNotNull(assignmentMap,
+ is.getResourceName() + "'s best possible assignment is null for
partition " + partition);
+ Assert.assertTrue(!assignmentMap.isEmpty(),
+ is.getResourceName() + "'s partition " + partition + " has no best
possible map in IS.");
+
+ boolean hasTopState = false;
+ int activeReplica = 0;
+ for (String state : assignmentMap.values()) {
+ if (topState.equalsIgnoreCase(state)) {
+ hasTopState = true;
+ }
+ if (activeStates.contains(state)) {
+ activeReplica++;
+ }
+ }
+
+ if (activeReplica < minActiveReplica) {
+ int a = 0;
+ }
+
+ Assert.assertTrue(hasTopState, String.format("%s missing %s replica",
partition, topState));
+ Assert.assertTrue(activeReplica >= minActiveReplica,
+ String.format("%s has less active replica %d then required %d",
partition, activeReplica,
+ minActiveReplica));
+ }
+ }
+
+ protected void runStage(HelixManager manager, ClusterEvent event, Stage
stage) throws Exception {
+ event.addAttribute(AttributeName.helixmanager.name(), manager);
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+
+ // AbstractAsyncBaseStage will run asynchronously, and it's main logics
are implemented in
+ // execute() function call
+ if (stage instanceof AbstractAsyncBaseStage) {
+ ((AbstractAsyncBaseStage) stage).execute(event);
+ } else {
+ stage.process(event);
+ }
+ stage.postProcess();
+ }
+
+ public void verifyInstance(HelixZkClient zkClient, String clusterName,
String instance,
+ boolean wantExists) {
+ // String instanceConfigsPath = HelixUtil.getConfigPath(clusterName);
+ String instanceConfigsPath =
PropertyPathBuilder.instanceConfig(clusterName);
+ String instanceConfigPath = instanceConfigsPath + "/" + instance;
+ String instancePath = PropertyPathBuilder.instance(clusterName, instance);
+ Assert.assertEquals(wantExists, zkClient.exists(instanceConfigPath));
+ Assert.assertEquals(wantExists, zkClient.exists(instancePath));
+ }
+
+ public void verifyResource(HelixZkClient zkClient, String clusterName,
String resource,
+ boolean wantExists) {
+ String resourcePath = PropertyPathBuilder.idealState(clusterName,
resource);
+ Assert.assertEquals(wantExists, zkClient.exists(resourcePath));
+ }
+
+ public void verifyEnabled(HelixZkClient zkClient, String clusterName, String
instance,
+ boolean wantEnabled) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ InstanceConfig config =
accessor.getProperty(keyBuilder.instanceConfig(instance));
+ Assert.assertEquals(wantEnabled, config.getInstanceEnabled());
+ }
+
+ public void verifyReplication(HelixZkClient zkClient, String clusterName,
String resource,
+ int repl) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(zkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ IdealState idealState =
accessor.getProperty(keyBuilder.idealStates(resource));
+ for (String partitionName : idealState.getPartitionSet()) {
+ if (idealState.getRebalanceMode() == IdealState.RebalanceMode.SEMI_AUTO)
{
+ Assert.assertEquals(repl,
idealState.getPreferenceList(partitionName).size());
+ } else if (idealState.getRebalanceMode() ==
IdealState.RebalanceMode.CUSTOMIZED) {
+ Assert.assertEquals(repl,
idealState.getInstanceStateMap(partitionName).size());
+ }
+ }
+ }
+
+ protected void setupStateModel(String clusterName) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ StateModelDefinition masterSlave =
+ new
StateModelDefinition(StateModelConfigGenerator.generateConfigForMasterSlave());
+ accessor.setProperty(keyBuilder.stateModelDef(masterSlave.getId()),
masterSlave);
+
+ StateModelDefinition leaderStandby =
+ new
StateModelDefinition(StateModelConfigGenerator.generateConfigForLeaderStandby());
+ accessor.setProperty(keyBuilder.stateModelDef(leaderStandby.getId()),
leaderStandby);
+
+ StateModelDefinition onlineOffline =
+ new
StateModelDefinition(StateModelConfigGenerator.generateConfigForOnlineOffline());
+ accessor.setProperty(keyBuilder.stateModelDef(onlineOffline.getId()),
onlineOffline);
+
+ }
+
+ protected Message createMessage(Message.MessageType type, String msgId,
String fromState,
+ String toState, String resourceName, String tgtName) {
+ Message msg = new Message(type.toString(), msgId);
+ msg.setFromState(fromState);
+ msg.setToState(toState);
+
msg.getRecord().setSimpleField(Message.Attributes.RESOURCE_NAME.toString(),
resourceName);
+ msg.setTgtName(tgtName);
+ return msg;
+ }
+
+ protected List<IdealState> setupIdealState(String clusterName, int[] nodes,
String[] resources,
+ int partitions, int replicas) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ List<IdealState> idealStates = new ArrayList<>();
+ List<String> instances = new ArrayList<>();
+ for (int i : nodes) {
+ instances.add("localhost_" + i);
+ }
+
+ for (String resourceName : resources) {
+ IdealState idealState = new IdealState(resourceName);
+ for (int p = 0; p < partitions; p++) {
+ List<String> value = new ArrayList<>();
+ for (int r = 0; r < replicas; r++) {
+ int n = nodes[(p + r) % nodes.length];
+ value.add("localhost_" + n);
+ }
+ idealState.getRecord().setListField(resourceName + "_" + p, value);
+ }
+
+ idealState.setReplicas(Integer.toString(replicas));
+ idealState.setStateModelDefRef("MasterSlave");
+ idealState.setRebalanceMode(IdealState.RebalanceMode.SEMI_AUTO);
+ idealState.setNumPartitions(partitions);
+ idealStates.add(idealState);
+
+ // System.out.println(idealState);
+ accessor.setProperty(keyBuilder.idealStates(resourceName), idealState);
+ }
+ return idealStates;
+ }
+
+ @AfterClass
+ public void cleanupLiveInstanceOwners() throws InterruptedException {
+ String testClassName = this.getShortClassName();
+ System.out.println("AfterClass: " + testClassName + " called.");
+ for (String cluster : _liveInstanceOwners.keySet()) {
+ Map<String, HelixZkClient> clientMap = _liveInstanceOwners.get(cluster);
+ for (HelixZkClient client : clientMap.values()) {
+ client.close();
+ }
+ clientMap.clear();
+ }
+ _liveInstanceOwners.clear();
+ }
+
+ protected List<LiveInstance> setupLiveInstances(String clusterName, int[]
liveInstances) {
+ HelixZkClient.ZkClientConfig clientConfig = new
HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(new ZNRecordSerializer());
+
+ List<LiveInstance> result = new ArrayList<>();
+
+ for (int i = 0; i < liveInstances.length; i++) {
+ String instance = "localhost_" + liveInstances[i];
+ _liveInstanceOwners.putIfAbsent(clusterName, new HashMap<>());
+ Map<String, HelixZkClient> clientMap =
_liveInstanceOwners.get(clusterName);
+ clientMap.putIfAbsent(instance, DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR),
clientConfig));
+ HelixZkClient client = clientMap.get(instance);
+
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(client));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ LiveInstance liveInstance = new LiveInstance(instance);
+ // Keep setting the session id in the deprecated field for ensure the
same behavior as a real participant.
+ // Note the participant is doing so for backward compatibility.
+ liveInstance.setSessionId(Long.toHexString(client.getSessionId()));
+ // Please refer to the version requirement here:
helix-core/src/main/resources/cluster-manager-version.properties
+ // Ensuring version compatibility can avoid the warning message during
test.
+ liveInstance.setHelixVersion("0.4");
+ accessor.setProperty(keyBuilder.liveInstance(instance), liveInstance);
+ result.add(accessor.getProperty(keyBuilder.liveInstance(instance)));
+ }
+ return result;
+ }
+
+ protected void deleteLiveInstances(String clusterName) {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(_gZkClient));
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+
+ Map<String, HelixZkClient> clientMap =
_liveInstanceOwners.getOrDefault(clusterName, Collections.emptyMap());
+
+ for (String liveInstance :
accessor.getChildNames(keyBuilder.liveInstances())) {
+ ZKHelixDataAccessor dataAccessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<>(_gZkClient));
+ dataAccessor.removeProperty(keyBuilder.liveInstance(liveInstance));
+
+ HelixZkClient client = clientMap.remove(liveInstance);
+ if (client != null) {
+ client.close();
+ }
+ }
+
+ if (clientMap.isEmpty()) {
+ _liveInstanceOwners.remove(clusterName);
+ }
+ }
+
+ protected void setupInstances(String clusterName, int[] instances) {
+ HelixAdmin admin = new ZKHelixAdmin(_gZkClient);
+ for (int i = 0; i < instances.length; i++) {
+ String instance = "localhost_" + instances[i];
+ InstanceConfig instanceConfig = new InstanceConfig(instance);
+ instanceConfig.setHostName("localhost");
+ instanceConfig.setPort("" + instances[i]);
+
instanceConfig.setInstanceOperation(InstanceConstants.InstanceOperation.ENABLE);
+ admin.addInstance(clusterName, instanceConfig);
+ }
+ }
+
+ protected void runPipeline(ClusterEvent event, Pipeline pipeline, boolean
shouldThrowException)
+ throws Exception {
+ try {
+ pipeline.handle(event);
+ pipeline.finish();
+ } catch (Exception e) {
+ if (shouldThrowException) {
+ throw e;
+ } else {
+ LOG.error("Exception while executing pipeline: {}. Will not continue
to next pipeline",
+ pipeline, e);
+ }
+ }
+ }
+
+ protected void runStage(ClusterEvent event, Stage stage) throws Exception {
+ StageContext context = new StageContext();
+ stage.init(context);
+ stage.preProcess();
+
+ // AbstractAsyncBaseStage will run asynchronously, and it's main logics
are implemented in
+ // execute() function call
+ // TODO (harry): duplicated code in ZkIntegrationTestBase, consider moving
runStage()
+ // to a shared library
+ if (stage instanceof AbstractAsyncBaseStage) {
+ ((AbstractAsyncBaseStage) stage).execute(event);
+ } else {
+ stage.process(event);
+ }
+ stage.postProcess();
+ }
+
+ protected void deleteCluster(String clusterName) {
+ TestHelper.dropCluster(clusterName, _gZkClient, _gSetupTool);
+ }
+
+ /**
+ * Poll for the existence (or lack thereof) of a specific Helix property
+ * @param clazz the HelixProeprty subclass
+ * @param accessor connected HelixDataAccessor
+ * @param key the property key to look up
+ * @param shouldExist true if the property should exist, false otherwise
+ * @return the property if found, or null if it does not exist
+ */
+ protected <T extends HelixProperty> T pollForProperty(Class<T> clazz,
HelixDataAccessor accessor,
+ PropertyKey key, boolean shouldExist) throws InterruptedException {
+ final int POLL_TIMEOUT = 5000;
+ final int POLL_INTERVAL = 50;
+ T property = accessor.getProperty(key);
+ int timeWaited = 0;
+ while (((shouldExist && property == null) || (!shouldExist && property !=
null))
+ && timeWaited < POLL_TIMEOUT) {
+ Thread.sleep(POLL_INTERVAL);
+ timeWaited += POLL_INTERVAL;
+ property = accessor.getProperty(key);
+ }
+ return property;
+ }
+
+ /**
+ * Ensures that external view and current state are empty
+ */
+ protected static class EmptyZkVerifier implements
ClusterStateVerifier.ZkVerifier {
+ private final String _clusterName;
+ private final String _resourceName;
+ private final HelixZkClient _zkClient;
+
+ /**
+ * Instantiate the verifier
+ * @param clusterName the cluster to verify
+ * @param resourceName the resource to verify
+ */
+ public EmptyZkVerifier(String clusterName, String resourceName) {
+ _clusterName = clusterName;
+ _resourceName = resourceName;
+
+ _zkClient = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(ZK_ADDR));
+ _zkClient.setZkSerializer(new ZNRecordSerializer());
+ }
+
+ @Override
+ public boolean verify() {
+ BaseDataAccessor<ZNRecord> baseAccessor = new
ZkBaseDataAccessor<ZNRecord>(_zkClient);
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(_clusterName,
baseAccessor);
+ PropertyKey.Builder keyBuilder = accessor.keyBuilder();
+ ExternalView externalView =
accessor.getProperty(keyBuilder.externalView(_resourceName));
+
+ // verify external view empty
+ if (externalView != null) {
+ for (String partition : externalView.getPartitionSet()) {
+ Map<String, String> stateMap = externalView.getStateMap(partition);
+ if (stateMap != null && !stateMap.isEmpty()) {
+ LOG.error("External view not empty for " + partition);
+ return false;
+ }
+ }
+ }
+
+ // verify current state empty
+ List<String> liveParticipants =
accessor.getChildNames(keyBuilder.liveInstances());
+ for (String participant : liveParticipants) {
+ List<String> sessionIds =
accessor.getChildNames(keyBuilder.sessions(participant));
+ for (String sessionId : sessionIds) {
+ CurrentState currentState =
+ accessor.getProperty(keyBuilder.currentState(participant,
sessionId, _resourceName));
+ Map<String, String> partitionStateMap =
currentState.getPartitionStateMap();
+ if (partitionStateMap != null && !partitionStateMap.isEmpty()) {
+ LOG.error("Current state not empty for " + participant);
+ return false;
+ }
+ }
+
+ List<String> taskSessionIds =
+
accessor.getChildNames(keyBuilder.taskCurrentStateSessions(participant));
+ for (String sessionId : taskSessionIds) {
+ CurrentState taskCurrentState = accessor
+ .getProperty(keyBuilder.taskCurrentState(participant, sessionId,
_resourceName));
+ Map<String, String> taskPartitionStateMap =
taskCurrentState.getPartitionStateMap();
+ if (taskPartitionStateMap != null &&
!taskPartitionStateMap.isEmpty()) {
+ LOG.error("Task current state not empty for " + participant);
+ return false;
+ }
+ }
+ }
+ return true;
+ }
+
+ @Override
+ public ZkClient getZkClient() {
+ return (ZkClient) _zkClient;
+ }
+
+ @Override
+ public String getClusterName() {
+ return _clusterName;
+ }
+ }
+
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
new file mode 100644
index 000000000..4fecde88f
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterControllerManager.java
@@ -0,0 +1,49 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * The standalone cluster controller class
+ */
+public class ClusterControllerManager extends ClusterManager {
+ private static Logger LOG =
LoggerFactory.getLogger(ClusterControllerManager.class);
+
+ public ClusterControllerManager(String zkAddr, String clusterName) {
+ this(zkAddr, clusterName, "controller");
+ }
+
+ public ClusterControllerManager(String zkAddr, String clusterName, String
controllerName) {
+ super(zkAddr, clusterName, controllerName, InstanceType.CONTROLLER);
+ }
+
+ public ClusterControllerManager(String clusterName, HelixManagerProperty
helixManagerProperty) {
+ super(clusterName, "controller", InstanceType.CONTROLLER, null, null,
helixManagerProperty);
+ }
+
+ @Override
+ public void finalize() {
+ super.finalize();
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
new file mode 100644
index 000000000..3858513e5
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/ClusterManager.java
@@ -0,0 +1,135 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.atomic.AtomicLong;
+
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.InstanceType;
+import org.apache.helix.manager.zk.CallbackHandler;
+import org.apache.helix.manager.zk.HelixManagerStateListener;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class ClusterManager extends ZKHelixManager implements Runnable {
+ private static Logger LOG = LoggerFactory.getLogger(ClusterManager.class);
+ private static final int DISCONNECT_WAIT_TIME_MS = 3000;
+
+ private static AtomicLong UID = new AtomicLong(10000);
+ private long _uid;
+
+ private final String _clusterName;
+ private final String _instanceName;
+ private final InstanceType _type;
+
+ protected CountDownLatch _startCountDown = new CountDownLatch(1);
+ protected CountDownLatch _stopCountDown = new CountDownLatch(1);
+ protected CountDownLatch _waitStopFinishCountDown = new CountDownLatch(1);
+
+ protected boolean _started = false;
+
+ protected Thread _watcher;
+
+ protected ClusterManager(String zkAddr, String clusterName, String
instanceName,
+ InstanceType type) {
+ super(clusterName, instanceName, type, zkAddr);
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _type = type;
+ _uid = UID.getAndIncrement();
+ }
+ protected ClusterManager(String clusterName, String instanceName,
InstanceType instanceType,
+ String zkAddress, HelixManagerStateListener stateListener,
+ HelixManagerProperty helixManagerProperty) {
+ super(clusterName, instanceName, instanceType, zkAddress, stateListener,
helixManagerProperty);
+ _clusterName = clusterName;
+ _instanceName = instanceName;
+ _type = instanceType;
+ _uid = UID.getAndIncrement();
+ }
+
+ public void syncStop() {
+ _stopCountDown.countDown();
+ try {
+ _waitStopFinishCountDown.await();
+ _started = false;
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for finish", e);
+ }
+ }
+
+ // This should not be called more than once because HelixManager.connect()
should not be called more than once.
+ public void syncStart() {
+ if (_started) {
+ throw new RuntimeException(
+ "Helix Controller already started. Do not call syncStart() more than
once.");
+ } else {
+ _started = true;
+ }
+
+ _watcher = new Thread(this);
+ _watcher.setName(String
+ .format("ClusterManager_Watcher_%s_%s_%s_%d", _clusterName,
_instanceName, _type.name(), _uid));
+ LOG.debug("ClusterManager_watcher_{}_{}_{}_{} started, stacktrace {}",
_clusterName, _instanceName, _type.name(), _uid,
Thread.currentThread().getStackTrace());
+ _watcher.start();
+
+ try {
+ _startCountDown.await();
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted waiting for start", e);
+ }
+ }
+
+ @Override
+ public void run() {
+ try {
+ connect();
+ _startCountDown.countDown();
+ _stopCountDown.await();
+ } catch (Exception e) {
+ LOG.error("exception running controller-manager", e);
+ } finally {
+ _startCountDown.countDown();
+ disconnect();
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+
+ @Override
+ public void finalize() {
+ _watcher.interrupt();
+ try {
+ _watcher.join(DISCONNECT_WAIT_TIME_MS);
+ } catch (InterruptedException e) {
+ LOG.error("ClusterManager watcher cleanup in the finalize method was
interrupted.", e);
+ } finally {
+ if (isConnected()) {
+ LOG.warn(
+ "The HelixManager ({}-{}-{}) is still connected after {} ms wait.
This is a potential resource leakage!",
+ _clusterName, _instanceName, _type.name(),
DISCONNECT_WAIT_TIME_MS);
+ }
+ }
+ }
+}
+
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
new file mode 100644
index 000000000..34047c525
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/manager/MockParticipantManager.java
@@ -0,0 +1,118 @@
+package org.apache.helix.gateway.base.manager;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.concurrent.CountDownLatch;
+
+import org.apache.helix.HelixCloudProperty;
+import org.apache.helix.HelixManagerProperty;
+import org.apache.helix.HelixPropertyFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.gateway.base.statemodel.MockOFModelFactory;
+import org.apache.helix.gateway.base.statemodel.MockTransition;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.participant.StateMachineEngine;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockParticipantManager extends ClusterManager {
+ private static final Logger LOG =
LoggerFactory.getLogger(MockParticipantManager.class);
+
+ protected int _transDelay = 10;
+
+ protected MockOFModelFactory _ofModelFactory;
+ protected HelixCloudProperty _helixCloudProperty;
+
+ public MockParticipantManager(String zkAddr, String clusterName, String
instanceName) {
+ this(zkAddr, clusterName, instanceName, 10);
+ }
+
+ public MockParticipantManager(String zkAddr, String clusterName, String
instanceName,
+ int transDelay) {
+ this(zkAddr, clusterName, instanceName, transDelay, null);
+ }
+
+ public MockParticipantManager(String zkAddr, String clusterName, String
instanceName,
+ int transDelay, HelixCloudProperty helixCloudProperty) {
+ this(zkAddr, clusterName, instanceName, transDelay, helixCloudProperty,
+ HelixPropertyFactory.getInstance().getHelixManagerProperty(zkAddr,
clusterName));
+ }
+
+ public MockParticipantManager(String zkAddr, String clusterName, String
instanceName,
+ int transDelay, HelixCloudProperty helixCloudProperty,
+ HelixManagerProperty helixManagerProperty) {
+ super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr, null,
helixManagerProperty);
+ _transDelay = transDelay;
+ _ofModelFactory = new MockOFModelFactory(null);
+ _helixCloudProperty = helixCloudProperty;
+ }
+
+ public MockParticipantManager(String clusterName, String instanceName,
+ HelixManagerProperty helixManagerProperty, int transDelay,
+ HelixCloudProperty helixCloudProperty) {
+ super(clusterName, instanceName, InstanceType.PARTICIPANT, null, null,
helixManagerProperty);
+ _transDelay = transDelay;
+ _ofModelFactory = new MockOFModelFactory();
+ _helixCloudProperty = helixCloudProperty;
+ }
+
+ public void setTransition(MockTransition transition) {
+ _ofModelFactory.setTrasition(transition);
+ }
+
+ /**
+ * This method should be called before syncStart() called after syncStop()
+ */
+ public void reset() {
+ syncStop();
+ _startCountDown = new CountDownLatch(1);
+ _stopCountDown = new CountDownLatch(1);
+ _waitStopFinishCountDown = new CountDownLatch(1);
+ }
+
+ @Override
+ public void run() {
+ try {
+ StateMachineEngine stateMach = getStateMachineEngine();
+
stateMach.registerStateModelFactory(BuiltInStateModelDefinitions.OnlineOffline.name(),
+ _ofModelFactory);
+ connect();
+ _startCountDown.countDown();
+
+ _stopCountDown.await();
+ } catch (InterruptedException e) {
+ String msg =
+ "participant: " + getInstanceName() + ", " +
Thread.currentThread().getName()
+ + " is interrupted";
+ LOG.info(msg);
+ } catch (Exception e) {
+ LOG.error("exception running participant-manager", e);
+ } finally {
+ _startCountDown.countDown();
+
+ disconnect();
+ _waitStopFinishCountDown.countDown();
+ }
+ }
+ @Override
+ public void finalize() {
+ super.finalize();
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
new file mode 100644
index 000000000..cf656b29d
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFModelFactory.java
@@ -0,0 +1,54 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.participant.statemachine.StateModelFactory;
+
+// mock master slave state model factory
+public class MockOFModelFactory extends StateModelFactory<MockOFStateModel> {
+ private MockTransition _transition;
+
+ public MockOFModelFactory() {
+ this(null);
+ }
+
+ public MockOFModelFactory(MockTransition transition) {
+ _transition = transition;
+ }
+
+ public void setTrasition(MockTransition transition) {
+ _transition = transition;
+
+ // set existing transition
+ for (String resource : getResourceSet()) {
+ for (String partition : getPartitionSet(resource)) {
+ MockOFStateModel stateModel = getStateModel(resource, partition);
+ stateModel.setTransition(transition);
+ }
+ }
+ }
+
+ @Override
+ public MockOFStateModel createNewStateModel(String resourceName, String
partitionKey) {
+ MockOFStateModel model = new MockOFStateModel(_transition);
+
+ return model;
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
new file mode 100644
index 000000000..0e0350e1f
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockOFStateModel.java
@@ -0,0 +1,65 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.apache.helix.participant.statemachine.StateModel;
+import org.apache.helix.participant.statemachine.StateModelInfo;
+import org.apache.helix.participant.statemachine.Transition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+// mock master-slave state model
+@StateModelInfo(initialState = "OFFLINE", states = {
+ "ONLINE", "ERROR"
+})
+public class MockOFStateModel extends StateModel {
+ private static Logger LOG = LoggerFactory.getLogger(MockOFStateModel.class);
+
+ protected MockTransition _transition;
+
+ public MockOFStateModel(MockTransition transition) {
+ _transition = transition;
+ }
+
+ public void setTransition(MockTransition transition) {
+ _transition = transition;
+ }
+
+ @Transition(to = "*", from = "*")
+ public void generalTransitionHandle(Message message, NotificationContext
context)
+ throws InterruptedException {
+ LOG.info(String
+ .format("Resource %s partition %s becomes %s from %s",
message.getResourceName(),
+ message.getPartitionName(), message.getToState(),
message.getFromState()));
+ if (_transition != null) {
+ _transition.doTransition(message, context);
+ }
+ }
+
+ @Override
+ public void reset() {
+ LOG.info("Default MockMSStateModel.reset() invoked");
+ if (_transition != null) {
+ _transition.doReset();
+ }
+ }
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
new file mode 100644
index 000000000..d61ba841d
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/statemodel/MockTransition.java
@@ -0,0 +1,41 @@
+package org.apache.helix.gateway.base.statemodel;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import org.apache.helix.NotificationContext;
+import org.apache.helix.model.Message;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class MockTransition {
+ private static Logger LOG = LoggerFactory.getLogger(MockTransition.class);
+
+ // called by state model transition functions
+ public void doTransition(Message message, NotificationContext context)
+ throws InterruptedException {
+ LOG.info("default doTransition() invoked");
+ }
+
+ // called by state model reset function
+ public void doReset() {
+ LOG.info("default doReset() invoked");
+ }
+
+}
diff --git
a/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
new file mode 100644
index 000000000..681f39205
--- /dev/null
+++
b/helix-gateway/src/test/java/org/apache/helix/gateway/base/util/TestHelper.java
@@ -0,0 +1,817 @@
+package org.apache.helix.gateway.base.util;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.io.File;
+import java.io.IOException;
+import java.lang.reflect.Method;
+import java.net.ServerSocket;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.Callable;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.io.FileUtils;
+import org.apache.helix.BaseDataAccessor;
+import org.apache.helix.HelixManager;
+import org.apache.helix.PropertyKey.Builder;
+import
org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy;
+import org.apache.helix.controller.rebalancer.strategy.RebalanceStrategy;
+import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.apache.helix.manager.zk.ZNRecordSerializer;
+import org.apache.helix.manager.zk.ZkBaseDataAccessor;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.IdealState.RebalanceMode;
+import org.apache.helix.model.Message;
+import org.apache.helix.model.Message.MessageType;
+import org.apache.helix.model.StateModelDefinition;
+import
org.apache.helix.model.StateModelDefinition.StateModelDefinitionProperty;
+import org.apache.helix.store.zk.ZNode;
+import org.apache.helix.tools.ClusterSetup;
+import org.apache.helix.util.ZKClientPool;
+import org.apache.helix.zookeeper.api.client.HelixZkClient;
+import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
+import org.apache.helix.zookeeper.zkclient.IDefaultNameSpace;
+import org.apache.helix.zookeeper.zkclient.ZkClient;
+import org.apache.helix.zookeeper.zkclient.ZkServer;
+import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
+import org.apache.zookeeper.data.Stat;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+
+public class TestHelper {
+ private static final Logger LOG = LoggerFactory.getLogger(TestHelper.class);
+ public static final long WAIT_DURATION = 60 * 1000L; // 60 seconds
+ public static final int DEFAULT_REBALANCE_PROCESSING_WAIT_TIME = 1500;
+ /**
+ * Returns a unused random port.
+ */
+ public static int getRandomPort() throws IOException {
+ ServerSocket sock = new ServerSocket();
+ sock.bind(null);
+ int port = sock.getLocalPort();
+ sock.close();
+
+ return port;
+ }
+
+ static public ZkServer startZkServer(final String zkAddress) throws
Exception {
+ List<String> empty = Collections.emptyList();
+ return TestHelper.startZkServer(zkAddress, empty, true);
+ }
+
+ static public ZkServer startZkServer(final String zkAddress, final String
rootNamespace)
+ throws Exception {
+ List<String> rootNamespaces = new ArrayList<String>();
+ rootNamespaces.add(rootNamespace);
+ return TestHelper.startZkServer(zkAddress, rootNamespaces, true);
+ }
+
+ static public ZkServer startZkServer(final String zkAddress, final
List<String> rootNamespaces)
+ throws Exception {
+ return startZkServer(zkAddress, rootNamespaces, true);
+ }
+
+ static public ZkServer startZkServer(final String zkAddress, final
List<String> rootNamespaces,
+ boolean overwrite) throws Exception {
+ System.out.println(
+ "Start zookeeper at " + zkAddress + " in thread " +
Thread.currentThread().getName());
+
+ String zkDir = zkAddress.replace(':', '_');
+ final String logDir = "/tmp/" + zkDir + "/logs";
+ final String dataDir = "/tmp/" + zkDir + "/dataDir";
+ if (overwrite) {
+ FileUtils.deleteDirectory(new File(dataDir));
+ FileUtils.deleteDirectory(new File(logDir));
+ }
+ ZKClientPool.reset();
+
+ IDefaultNameSpace defaultNameSpace = new IDefaultNameSpace() {
+ @Override
+ public void createDefaultNameSpace(ZkClient zkClient) {
+ if (rootNamespaces == null) {
+ return;
+ }
+
+ for (String rootNamespace : rootNamespaces) {
+ try {
+ zkClient.deleteRecursive(rootNamespace);
+ } catch (Exception e) {
+ LOG.error("fail to deleteRecursive path:" + rootNamespace, e);
+ }
+ }
+ }
+ };
+
+ int port = Integer.parseInt(zkAddress.substring(zkAddress.lastIndexOf(':')
+ 1));
+ ZkServer zkServer = new ZkServer(dataDir, logDir, defaultNameSpace, port);
+ zkServer.start();
+
+ return zkServer;
+ }
+
+ static public void stopZkServer(ZkServer zkServer) {
+ if (zkServer != null) {
+ zkServer.shutdown();
+ System.out.println(
+ "Shut down zookeeper at port " + zkServer.getPort() + " in thread "
+ Thread
+ .currentThread().getName());
+ }
+ }
+
+ public static void setupEmptyCluster(HelixZkClient zkClient, String
clusterName) {
+ ZKHelixAdmin admin = new ZKHelixAdmin(zkClient);
+ admin.addCluster(clusterName, true);
+ }
+
+ /**
+ * convert T[] to set<T>
+ * @param s
+ * @return
+ */
+ public static <T> Set<T> setOf(T... s) {
+ Set<T> set = new HashSet<T>(Arrays.asList(s));
+ return set;
+ }
+
+ /**
+ * generic method for verification with a timeout
+ * @param verifierName
+ * @param args
+ */
+ public static void verifyWithTimeout(String verifierName, long timeout,
Object... args) {
+ final long sleepInterval = 1000; // in ms
+ final int loop = (int) (timeout / sleepInterval) + 1;
+ try {
+ boolean result = false;
+ int i = 0;
+ for (; i < loop; i++) {
+ Thread.sleep(sleepInterval);
+ // verifier should be static method
+ result = (Boolean) TestHelper.getMethod(verifierName).invoke(null,
args);
+
+ if (result == true) {
+ break;
+ }
+ }
+
+ // debug
+ // LOG.info(verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify
("
+ // + result + ")");
+ System.err.println(
+ verifierName + ": wait " + ((i + 1) * 1000) + "ms to verify " + " ("
+ result + ")");
+ LOG.debug("args:" + Arrays.toString(args));
+ // System.err.println("args:" + Arrays.toString(args));
+
+ if (result == false) {
+ LOG.error(verifierName + " fails");
+ LOG.error("args:" + Arrays.toString(args));
+ }
+
+ Assert.assertTrue(result);
+ } catch (Exception e) {
+ LOG.error("Exception in verify: " + verifierName, e);
+ }
+ }
+
+ private static Method getMethod(String name) {
+ Method[] methods = TestHelper.class.getMethods();
+ for (Method method : methods) {
+ if (name.equals(method.getName())) {
+ return method;
+ }
+ }
+ return null;
+ }
+
+ public static boolean verifyEmptyCurStateAndExtView(String clusterName,
String resourceName,
+ Set<String> instanceNames, String zkAddr) {
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ try {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (String instanceName : instanceNames) {
+ List<String> sessionIds =
accessor.getChildNames(keyBuilder.sessions(instanceName));
+
+ for (String sessionId : sessionIds) {
+ CurrentState curState =
+ accessor.getProperty(keyBuilder.currentState(instanceName,
sessionId, resourceName));
+
+ if (curState != null && curState.getRecord().getMapFields().size()
!= 0) {
+ return false;
+ }
+
+ CurrentState taskCurState =
+ accessor.getProperty(keyBuilder.taskCurrentState(instanceName,
sessionId, resourceName));
+
+ if (taskCurState != null &&
taskCurState.getRecord().getMapFields().size() != 0) {
+ return false;
+ }
+ }
+
+ ExternalView extView =
accessor.getProperty(keyBuilder.externalView(resourceName));
+
+ if (extView != null && extView.getRecord().getMapFields().size() != 0)
{
+ return false;
+ }
+ }
+
+ return true;
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ public static boolean verifyNotConnected(HelixManager manager) {
+ return !manager.isConnected();
+ }
+
+ public static void setupCluster(String clusterName, String zkAddr, int
startPort,
+ String participantNamePrefix, String resourceNamePrefix, int resourceNb,
int partitionNb,
+ int nodesNb, int replica, String stateModelDef, boolean doRebalance)
throws Exception {
+ TestHelper
+ .setupCluster(clusterName, zkAddr, startPort, participantNamePrefix,
resourceNamePrefix,
+ resourceNb, partitionNb, nodesNb, replica, stateModelDef,
RebalanceMode.SEMI_AUTO,
+ doRebalance);
+ }
+
+ public static void setupCluster(String clusterName, String zkAddr, int
startPort,
+ String participantNamePrefix, String resourceNamePrefix, int resourceNb,
int partitionNb,
+ int nodesNb, int replica, String stateModelDef, RebalanceMode mode,
boolean doRebalance) {
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ try {
+ if (zkClient.exists("/" + clusterName)) {
+ LOG.warn("Cluster already exists:" + clusterName + ". Deleting it");
+ zkClient.deleteRecursively("/" + clusterName);
+ }
+
+ ClusterSetup setupTool = new ClusterSetup(zkAddr);
+ setupTool.addCluster(clusterName, true);
+
+ for (int i = 0; i < nodesNb; i++) {
+ int port = startPort + i;
+ setupTool.addInstanceToCluster(clusterName, participantNamePrefix +
"_" + port);
+ }
+
+ for (int i = 0; i < resourceNb; i++) {
+ String resourceName = resourceNamePrefix + i;
+ setupTool.addResourceToCluster(clusterName, resourceName, partitionNb,
stateModelDef,
+ mode.name(),
+ mode == RebalanceMode.FULL_AUTO ?
CrushEdRebalanceStrategy.class.getName()
+ : RebalanceStrategy.DEFAULT_REBALANCE_STRATEGY);
+ if (doRebalance) {
+ setupTool.rebalanceStorageCluster(clusterName, resourceName,
replica);
+ }
+ }
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ public static void dropCluster(String clusterName, RealmAwareZkClient
zkClient) {
+ ClusterSetup setupTool = new ClusterSetup(zkClient);
+ dropCluster(clusterName, zkClient, setupTool);
+ }
+
+ public static void dropCluster(String clusterName, RealmAwareZkClient
zkClient, ClusterSetup setup) {
+ String namespace = "/" + clusterName;
+ if (zkClient.exists(namespace)) {
+ try {
+ setup.deleteCluster(clusterName);
+ } catch (Exception ex) {
+ // Failed to delete, give some more time for connections to drop
+ try {
+ Thread.sleep(3000L);
+ setup.deleteCluster(clusterName);
+ } catch (Exception ignored) {
+ // OK - just ignore
+ }
+ }
+ }
+ }
+
+ /**
+ * @param stateMap
+ * : "ResourceName/partitionKey" -> setOf(instances)
+ * @param state
+ * : MASTER|SLAVE|ERROR...
+ */
+ public static void verifyState(String clusterName, String zkAddr,
+ Map<String, Set<String>> stateMap, String state) {
+ HelixZkClient zkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(zkAddr));
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+
+ try {
+ ZKHelixDataAccessor accessor =
+ new ZKHelixDataAccessor(clusterName, new
ZkBaseDataAccessor<ZNRecord>(zkClient));
+ Builder keyBuilder = accessor.keyBuilder();
+
+ for (String resGroupPartitionKey : stateMap.keySet()) {
+ Map<String, String> retMap =
getResourceAndPartitionKey(resGroupPartitionKey);
+ String resGroup = retMap.get("RESOURCE");
+ String partitionKey = retMap.get("PARTITION");
+
+ ExternalView extView =
accessor.getProperty(keyBuilder.externalView(resGroup));
+ for (String instance : stateMap.get(resGroupPartitionKey)) {
+ String actualState = extView.getStateMap(partitionKey).get(instance);
+ Assert.assertNotNull(actualState,
+ "externalView doesn't contain state for " + resGroup + "/" +
partitionKey + " on "
+ + instance + " (expect " + state + ")");
+
+ Assert.assertEquals(actualState, state,
+ "externalView for " + resGroup + "/" + partitionKey + " on " +
instance + " is "
+ + actualState + " (expect " + state + ")");
+ }
+ }
+ } finally {
+ zkClient.close();
+ }
+ }
+
+ /**
+ * @param resourcePartition
+ * : key is in form of "resource/partitionKey" or "resource_x"
+ * @return
+ */
+ private static Map<String, String> getResourceAndPartitionKey(String
resourcePartition) {
+ String resourceName;
+ String partitionName;
+ int idx = resourcePartition.indexOf('/');
+ if (idx > -1) {
+ resourceName = resourcePartition.substring(0, idx);
+ partitionName = resourcePartition.substring(idx + 1);
+ } else {
+ idx = resourcePartition.lastIndexOf('_');
+ resourceName = resourcePartition.substring(0, idx);
+ partitionName = resourcePartition;
+ }
+
+ Map<String, String> retMap = new HashMap<String, String>();
+ retMap.put("RESOURCE", resourceName);
+ retMap.put("PARTITION", partitionName);
+ return retMap;
+ }
+
+ public static <T> Map<String, T> startThreadsConcurrently(final int
nrThreads,
+ final Callable<T> method, final long timeout) {
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+ final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+ final List<Thread> threadList = new ArrayList<Thread>();
+
+ for (int i = 0; i < nrThreads; i++) {
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout) {
+ LOG.error("Timeout while waiting for start latch");
+ }
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while waiting for start latch");
+ }
+
+ try {
+ T result = method.call();
+ if (result != null) {
+ resultsMap.put("thread_" + this.getId(), result);
+ }
+ LOG.debug("result=" + result);
+ } catch (Exception e) {
+ LOG.error("Exeption in executing " + method.getClass().getName(),
e);
+ }
+
+ finishCounter.countDown();
+ }
+ };
+ threadList.add(thread);
+ thread.start();
+ }
+ startLatch.countDown();
+
+ // wait for all thread to complete
+ try {
+ boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout) {
+ LOG.error("Timeout while waiting for finish latch. Interrupt all
threads");
+ for (Thread thread : threadList) {
+ thread.interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for finish latch", e);
+ }
+
+ return resultsMap;
+ }
+
+ public static Message createMessage(String msgId, String fromState, String
toState,
+ String tgtName, String resourceName, String partitionName) {
+ Message msg = new Message(MessageType.STATE_TRANSITION, msgId);
+ msg.setFromState(fromState);
+ msg.setToState(toState);
+ msg.setTgtName(tgtName);
+ msg.setResourceName(resourceName);
+ msg.setPartitionName(partitionName);
+ msg.setStateModelDef("MasterSlave");
+
+ return msg;
+ }
+
+ public static String getTestMethodName() {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ return calls[2].getMethodName();
+ }
+
+ public static String getTestClassName() {
+ StackTraceElement[] calls = Thread.currentThread().getStackTrace();
+ String fullClassName = calls[2].getClassName();
+ return fullClassName.substring(fullClassName.lastIndexOf('.') + 1);
+ }
+
+ public static <T> Map<String, T> startThreadsConcurrently(final
List<Callable<T>> methods,
+ final long timeout) {
+ final int nrThreads = methods.size();
+ final CountDownLatch startLatch = new CountDownLatch(1);
+ final CountDownLatch finishCounter = new CountDownLatch(nrThreads);
+ final Map<String, T> resultsMap = new ConcurrentHashMap<String, T>();
+ final List<Thread> threadList = new ArrayList<Thread>();
+
+ for (int i = 0; i < nrThreads; i++) {
+ final Callable<T> method = methods.get(i);
+
+ Thread thread = new Thread() {
+ @Override
+ public void run() {
+ try {
+ boolean isTimeout = !startLatch.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout) {
+ LOG.error("Timeout while waiting for start latch");
+ }
+ } catch (InterruptedException ex) {
+ LOG.error("Interrupted while waiting for start latch");
+ }
+
+ try {
+ T result = method.call();
+ if (result != null) {
+ resultsMap.put("thread_" + this.getId(), result);
+ }
+ LOG.debug("result=" + result);
+ } catch (Exception e) {
+ LOG.error("Exeption in executing " + method.getClass().getName(),
e);
+ }
+
+ finishCounter.countDown();
+ }
+ };
+ threadList.add(thread);
+ thread.start();
+ }
+ startLatch.countDown();
+
+ // wait for all thread to complete
+ try {
+ boolean isTimeout = !finishCounter.await(timeout, TimeUnit.SECONDS);
+ if (isTimeout) {
+ LOG.error("Timeout while waiting for finish latch. Interrupt all
threads");
+ for (Thread thread : threadList) {
+ thread.interrupt();
+ }
+ }
+ } catch (InterruptedException e) {
+ LOG.error("Interrupted while waiting for finish latch", e);
+ }
+
+ return resultsMap;
+ }
+
+ public static void printCache(Map<String, ZNode> cache) {
+ System.out.println("START:Print cache");
+ TreeMap<String, ZNode> map = new TreeMap<String, ZNode>();
+ map.putAll(cache);
+
+ for (String key : map.keySet()) {
+ ZNode node = map.get(key);
+ TreeSet<String> childSet = new TreeSet<String>();
+ childSet.addAll(node.getChildSet());
+ System.out.print(
+ key + "=" + node.getData() + ", " + childSet + ", " +
(node.getStat() == null ? "null\n"
+ : node.getStat()));
+ }
+ System.out.println("END:Print cache");
+ }
+
+ public static void readZkRecursive(String path, Map<String, ZNode> map,
HelixZkClient zkclient) {
+ try {
+ Stat stat = new Stat();
+ ZNRecord record = zkclient.readData(path, stat);
+ List<String> childNames = zkclient.getChildren(path);
+ ZNode node = new ZNode(path, record, stat);
+ node.addChildren(childNames);
+ map.put(path, node);
+
+ for (String childName : childNames) {
+ String childPath = path + "/" + childName;
+ readZkRecursive(childPath, map, zkclient);
+ }
+ } catch (ZkNoNodeException e) {
+ // OK
+ }
+ }
+
+ public static void readZkRecursive(String path, Map<String, ZNode> map,
+ BaseDataAccessor<ZNRecord> zkAccessor) {
+ try {
+ Stat stat = new Stat();
+ ZNRecord record = zkAccessor.get(path, stat, 0);
+ List<String> childNames = zkAccessor.getChildNames(path, 0);
+ // System.out.println("childNames: " + childNames);
+ ZNode node = new ZNode(path, record, stat);
+ node.addChildren(childNames);
+ map.put(path, node);
+
+ if (childNames != null && !childNames.isEmpty()) {
+ for (String childName : childNames) {
+ String childPath = path + "/" + childName;
+ readZkRecursive(childPath, map, zkAccessor);
+ }
+ }
+ } catch (ZkNoNodeException e) {
+ // OK
+ }
+ }
+
+ public static boolean verifyZkCache(List<String> paths,
BaseDataAccessor<ZNRecord> zkAccessor,
+ HelixZkClient zkclient, boolean needVerifyStat) {
+ // read everything
+ Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+ Map<String, ZNode> cache = new HashMap<String, ZNode>();
+ for (String path : paths) {
+ readZkRecursive(path, zkMap, zkclient);
+ readZkRecursive(path, cache, zkAccessor);
+ }
+ // printCache(map);
+
+ return verifyZkCache(paths, null, cache, zkMap, needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths, Map<String, ZNode>
cache,
+ HelixZkClient zkclient, boolean needVerifyStat) {
+ return verifyZkCache(paths, null, cache, zkclient, needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths, List<String>
pathsExcludeForStat,
+ Map<String, ZNode> cache, HelixZkClient zkclient, boolean
needVerifyStat) {
+ // read everything on zk under paths
+ Map<String, ZNode> zkMap = new HashMap<String, ZNode>();
+ for (String path : paths) {
+ readZkRecursive(path, zkMap, zkclient);
+ }
+ // printCache(map);
+
+ return verifyZkCache(paths, pathsExcludeForStat, cache, zkMap,
needVerifyStat);
+ }
+
+ public static boolean verifyZkCache(List<String> paths, List<String>
pathsExcludeForStat,
+ Map<String, ZNode> cache, Map<String, ZNode> zkMap, boolean
needVerifyStat) {
+ // equal size
+ if (zkMap.size() != cache.size()) {
+ System.err
+ .println("size mismatch: cacheSize: " + cache.size() + ", zkMapSize:
" + zkMap.size());
+ System.out.println("cache: (" + cache.size() + ")");
+ TestHelper.printCache(cache);
+
+ System.out.println("zkMap: (" + zkMap.size() + ")");
+ TestHelper.printCache(zkMap);
+
+ return false;
+ }
+
+ // everything in cache is also in map
+ for (String path : cache.keySet()) {
+ ZNode cacheNode = cache.get(path);
+ ZNode zkNode = zkMap.get(path);
+
+ if (zkNode == null) {
+ // in cache but not on zk
+ System.err.println("path: " + path + " in cache but not on zk:
inCacheNode: " + cacheNode);
+ return false;
+ }
+
+ if ((zkNode.getData() == null && cacheNode.getData() != null) ||
(zkNode.getData() != null
+ && cacheNode.getData() == null) || (zkNode.getData() != null
+ && cacheNode.getData() != null &&
!zkNode.getData().equals(cacheNode.getData()))) {
+ // data not equal
+ System.err.println(
+ "data mismatch on path: " + path + ", inCache: " +
cacheNode.getData() + ", onZk: "
+ + zkNode.getData());
+ return false;
+ }
+
+ if ((zkNode.getChildSet() == null && cacheNode.getChildSet() != null) ||
(
+ zkNode.getChildSet() != null && cacheNode.getChildSet() == null) || (
+ zkNode.getChildSet() != null && cacheNode.getChildSet() != null &&
!zkNode.getChildSet()
+ .equals(cacheNode.getChildSet()))) {
+ // childSet not equal
+ System.err.println(
+ "childSet mismatch on path: " + path + ", inCache: " +
cacheNode.getChildSet()
+ + ", onZk: " + zkNode.getChildSet());
+ return false;
+ }
+
+ if (needVerifyStat && pathsExcludeForStat != null &&
!pathsExcludeForStat.contains(path)) {
+ if (cacheNode.getStat() == null ||
!zkNode.getStat().equals(cacheNode.getStat())) {
+ // stat not equal
+ System.err.println(
+ "Stat mismatch on path: " + path + ", inCache: " +
cacheNode.getStat() + ", onZk: "
+ + zkNode.getStat());
+ return false;
+ }
+ }
+ }
+
+ return true;
+ }
+
+ public static StateModelDefinition generateStateModelDefForBootstrap() {
+ ZNRecord record = new ZNRecord("Bootstrap");
+
record.setSimpleField(StateModelDefinitionProperty.INITIAL_STATE.toString(),
"IDLE");
+ List<String> statePriorityList = new ArrayList<String>();
+ statePriorityList.add("ONLINE");
+ statePriorityList.add("BOOTSTRAP");
+ statePriorityList.add("OFFLINE");
+ statePriorityList.add("IDLE");
+ statePriorityList.add("DROPPED");
+ statePriorityList.add("ERROR");
+
record.setListField(StateModelDefinitionProperty.STATE_PRIORITY_LIST.toString(),
+ statePriorityList);
+ for (String state : statePriorityList) {
+ String key = state + ".meta";
+ Map<String, String> metadata = new HashMap<String, String>();
+ if (state.equals("ONLINE")) {
+ metadata.put("count",
StateModelDefinition.STATE_REPLICA_COUNT_ALL_REPLICAS);
+ record.setMapField(key, metadata);
+ } else if (state.equals("BOOTSTRAP")) {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ } else if (state.equals("OFFLINE")) {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ } else if (state.equals("IDLE")) {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ } else if (state.equals("DROPPED")) {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ } else if (state.equals("ERROR")) {
+ metadata.put("count", "-1");
+ record.setMapField(key, metadata);
+ }
+ }
+
+ for (String state : statePriorityList) {
+ String key = state + ".next";
+ if (state.equals("ONLINE")) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("BOOTSTRAP", "OFFLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ metadata.put("IDLE", "OFFLINE");
+ record.setMapField(key, metadata);
+ } else if (state.equals("BOOTSTRAP")) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "ONLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "OFFLINE");
+ metadata.put("IDLE", "OFFLINE");
+ record.setMapField(key, metadata);
+ } else if (state.equals("OFFLINE")) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "BOOTSTRAP");
+ metadata.put("BOOTSTRAP", "BOOTSTRAP");
+ metadata.put("DROPPED", "IDLE");
+ metadata.put("IDLE", "IDLE");
+ record.setMapField(key, metadata);
+ } else if (state.equals("IDLE")) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("ONLINE", "OFFLINE");
+ metadata.put("BOOTSTRAP", "OFFLINE");
+ metadata.put("OFFLINE", "OFFLINE");
+ metadata.put("DROPPED", "DROPPED");
+ record.setMapField(key, metadata);
+ } else if (state.equals("ERROR")) {
+ Map<String, String> metadata = new HashMap<String, String>();
+ metadata.put("IDLE", "IDLE");
+ record.setMapField(key, metadata);
+ }
+ }
+ List<String> stateTransitionPriorityList = new ArrayList<String>();
+ stateTransitionPriorityList.add("ONLINE-OFFLINE");
+ stateTransitionPriorityList.add("BOOTSTRAP-ONLINE");
+ stateTransitionPriorityList.add("OFFLINE-BOOTSTRAP");
+ stateTransitionPriorityList.add("BOOTSTRAP-OFFLINE");
+ stateTransitionPriorityList.add("OFFLINE-IDLE");
+ stateTransitionPriorityList.add("IDLE-OFFLINE");
+ stateTransitionPriorityList.add("IDLE-DROPPED");
+ stateTransitionPriorityList.add("ERROR-IDLE");
+
record.setListField(StateModelDefinitionProperty.STATE_TRANSITION_PRIORITYLIST.toString(),
+ stateTransitionPriorityList);
+ return new StateModelDefinition(record);
+ }
+
+ public static String znrecordToString(ZNRecord record) {
+ StringBuffer sb = new StringBuffer();
+ sb.append(record.getId() + "\n");
+ Map<String, String> simpleFields = record.getSimpleFields();
+ if (simpleFields != null) {
+ sb.append("simpleFields\n");
+ for (String key : simpleFields.keySet()) {
+ sb.append(" " + key + "\t: " + simpleFields.get(key) + "\n");
+ }
+ }
+
+ Map<String, List<String>> listFields = record.getListFields();
+ sb.append("listFields\n");
+ for (String key : listFields.keySet()) {
+ List<String> list = listFields.get(key);
+ sb.append(" " + key + "\t: ");
+ for (String listValue : list) {
+ sb.append(listValue + ", ");
+ }
+ sb.append("\n");
+ }
+
+ Map<String, Map<String, String>> mapFields = record.getMapFields();
+ sb.append("mapFields\n");
+ for (String key : mapFields.keySet()) {
+ Map<String, String> map = mapFields.get(key);
+ sb.append(" " + key + "\t: \n");
+ for (String mapKey : map.keySet()) {
+ sb.append(" " + mapKey + "\t: " + map.get(mapKey) + "\n");
+ }
+ }
+
+ return sb.toString();
+ }
+
+ public interface Verifier {
+ boolean verify() throws Exception;
+ }
+
+ public static boolean verify(Verifier verifier, long timeout) throws
Exception {
+ long start = System.currentTimeMillis();
+ do {
+ boolean result = verifier.verify();
+ boolean isTimedout = (System.currentTimeMillis() - start) > timeout;
+ if (result || isTimedout) {
+ if (isTimedout && !result) {
+ LOG.error("verifier time out, consider try longer timeout, stack
trace{}",
+ Arrays.asList(Thread.currentThread().getStackTrace()));
+ }
+ return result;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+}