This is an automated email from the ASF dual-hosted git repository. jackie pushed a commit to branch helix_setup in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git
commit db6fdf8da0321d1a1ef283bdb3becd64ad6c4068 Author: Jackie (Xiaotian) Jiang <[email protected]> AuthorDate: Thu Jul 11 18:20:42 2019 -0700 Cleanup un-necessary setups in HelixSetupUtils - Do not enable leadControllerResource by default (no auto-rebalance) - Remove redundant empty brokerResource resource config - Remove redundant empty leadControllerResource resource config - No need to initialize property store sub-directories --- .../apache/pinot/controller/ControllerStarter.java | 20 +- .../helix/core/util/HelixSetupUtils.java | 213 +++++++------------- .../helix/ControllerPeriodicTaskStarterTest.java | 38 +--- .../pinot/controller/helix/ControllerTest.java | 54 ++--- .../controller/helix/PinotControllerModeTest.java | 217 +++++++++++---------- .../helix/core/PinotHelixResourceManagerTest.java | 107 +++++----- .../test/java/org/apache/pinot/util/TestUtils.java | 39 ++-- 7 files changed, 284 insertions(+), 404 deletions(-) diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java index c313616..ee2fda1 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/ControllerStarter.java @@ -19,6 +19,7 @@ package org.apache.pinot.controller; import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; import com.google.common.primitives.Longs; import com.google.common.util.concurrent.ThreadFactoryBuilder; import com.yammer.metrics.core.MetricsRegistry; @@ -31,6 +32,7 @@ import java.util.List; import java.util.concurrent.Executor; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; import org.apache.commons.configuration.Configuration; import org.apache.commons.httpclient.HttpConnectionManager; import org.apache.commons.httpclient.MultiThreadedHttpConnectionManager; @@ -219,7 +221,7 @@ public class ControllerStarter { private void setUpHelixController() { // Register and connect instance as Helix controller. LOGGER.info("Starting Helix controller"); - _helixControllerManager = HelixSetupUtils.setup(_helixClusterName, _helixZkURL, _instanceId); + _helixControllerManager = HelixSetupUtils.setupHelixController(_helixClusterName, _helixZkURL, _instanceId); // Emit helix controller metrics _controllerMetrics.addCallbackGauge(CommonConstants.Helix.INSTANCE_CONNECTED_METRIC_NAME, @@ -236,12 +238,10 @@ public class ControllerStarter { } private void setUpPinotController() { - // Note: Right now we don't allow pinot-only mode to be used in production yet. - // Now we only have this mode used in tests. - // TODO: Remove this logic once all the helix separation PRs are committed. - if (_controllerMode == ControllerConf.ControllerMode.PINOT_ONLY && !isPinotOnlyModeSupported()) { - throw new RuntimeException("Pinot only controller currently isn't supported in production yet."); - } + // Note: Right now we don't allow Pinot-only controller as ControllerLeadershipManager is setup in Helix controller + // and Pinot controller relies on it + // TODO: Remove ControllerLeadershipManager + Preconditions.checkState(_controllerLeadershipManager != null); // Set up Pinot cluster in Helix HelixSetupUtils.setupPinotCluster(_helixClusterName, _helixZkURL, _isUpdateStateModel, _enableBatchMessageMode); @@ -500,16 +500,14 @@ public class ControllerStarter { LOGGER.info("Stopping resource manager"); _helixResourceManager.stop(); + LOGGER.info("Shutting down executor service"); _executorService.shutdownNow(); + _executorService.awaitTermination(10L, TimeUnit.SECONDS); } catch (final Exception e) { LOGGER.error("Caught exception while shutting down", e); } } - public boolean isPinotOnlyModeSupported() { - return false; - } - public MetricsRegistry getMetricsRegistry() { return _metricsRegistry; } diff --git a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java index 82c4cab..dfa4979 100644 --- a/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java +++ b/pinot-controller/src/main/java/org/apache/pinot/controller/helix/core/util/HelixSetupUtils.java @@ -20,16 +20,10 @@ package org.apache.pinot.controller.helix.core.util; import com.google.common.base.Preconditions; import java.util.Collections; -import java.util.HashMap; -import java.util.List; import java.util.concurrent.TimeUnit; -import org.apache.helix.AccessOption; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixDataAccessor; import org.apache.helix.HelixManager; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyPathBuilder; -import org.apache.helix.ZNRecord; import org.apache.helix.controller.HelixControllerMain; import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; import org.apache.helix.manager.zk.ZKHelixAdmin; @@ -37,44 +31,35 @@ import org.apache.helix.manager.zk.ZKHelixDataAccessor; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZNRecordSerializer; import org.apache.helix.manager.zk.ZkBaseDataAccessor; -import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.manager.zk.client.HelixZkClient; +import org.apache.helix.manager.zk.client.SharedZkClientFactory; import org.apache.helix.model.HelixConfigScope; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; import org.apache.helix.model.MasterSlaveSMD; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.HelixConfigScopeBuilder; -import org.apache.helix.store.zk.ZkHelixPropertyStore; -import org.apache.pinot.common.utils.CommonConstants; -import org.apache.pinot.common.utils.helix.HelixHelper; import org.apache.pinot.controller.helix.core.PinotHelixBrokerResourceOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotHelixSegmentOnlineOfflineStateModelGenerator; import org.apache.pinot.controller.helix.core.PinotTableIdealStateBuilder; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import static org.apache.pinot.common.utils.CommonConstants.Helix.ENABLE_DELAY_REBALANCE; -import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; -import static org.apache.pinot.common.utils.CommonConstants.Helix.MIN_ACTIVE_REPLICAS; -import static org.apache.pinot.common.utils.CommonConstants.Helix.REBALANCE_DELAY_MS; +import static org.apache.pinot.common.utils.CommonConstants.Helix.*; -/** - * HelixSetupUtils handles how to create or get a helixCluster in controller. - * - * - */ public class HelixSetupUtils { + private HelixSetupUtils() { + } + private static final Logger LOGGER = LoggerFactory.getLogger(HelixSetupUtils.class); - public static HelixManager setup(String helixClusterName, String zkPath, String helixControllerInstanceId) { + public static HelixManager setupHelixController(String helixClusterName, String zkPath, String instanceId) { setupHelixClusterIfNeeded(helixClusterName, zkPath); - return startHelixControllerInStandaloneMode(helixClusterName, zkPath, helixControllerInstanceId); + return HelixControllerMain + .startHelixController(zkPath, helixClusterName, instanceId, HelixControllerMain.STANDALONE); } - /** - * Set up a brand new Helix cluster if it doesn't exist. - */ private static void setupHelixClusterIfNeeded(String helixClusterName, String zkPath) { HelixAdmin admin = new ZKHelixAdmin(zkPath); if (admin.getClusters().contains(helixClusterName)) { @@ -90,162 +75,106 @@ public class HelixSetupUtils { } } - private static HelixManager startHelixControllerInStandaloneMode(String helixClusterName, String zkUrl, - String pinotControllerInstanceId) { - LOGGER.info("Starting Helix Standalone Controller ... "); - return HelixControllerMain - .startHelixController(zkUrl, helixClusterName, pinotControllerInstanceId, HelixControllerMain.STANDALONE); - } - - /** - * Customizes existing Helix cluster to run Pinot components. - */ public static void setupPinotCluster(String helixClusterName, String zkPath, boolean isUpdateStateModel, boolean enableBatchMessageMode) { - final HelixAdmin admin = new ZKHelixAdmin(zkPath); - Preconditions.checkState(admin.getClusters().contains(helixClusterName), - String.format("Helix cluster: %s hasn't been set up", helixClusterName)); - - // Add segment state model definition if needed - addSegmentStateModelDefinitionIfNeeded(helixClusterName, admin, zkPath, isUpdateStateModel); - - // Add broker resource if needed - createBrokerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); - - // Add lead controller resource if needed - createLeadControllerResourceIfNeeded(helixClusterName, admin, enableBatchMessageMode); - - // Init property store if needed - initPropertyStoreIfNeeded(helixClusterName, zkPath); + HelixZkClient zkClient = null; + try { + zkClient = SharedZkClientFactory.getInstance().buildZkClient(new HelixZkClient.ZkConnectionConfig(zkPath), + new HelixZkClient.ZkClientConfig().setZkSerializer(new ZNRecordSerializer()) + .setConnectInitTimeout(TimeUnit.SECONDS.toMillis(ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC))); + zkClient.waitUntilConnected(ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); + HelixAdmin helixAdmin = new ZKHelixAdmin(zkClient); + HelixDataAccessor helixDataAccessor = + new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient)); + + Preconditions.checkState(helixAdmin.getClusters().contains(helixClusterName), + String.format("Helix cluster: %s hasn't been set up", helixClusterName)); + + // Add segment state model definition if needed + addSegmentStateModelDefinitionIfNeeded(helixClusterName, helixAdmin, helixDataAccessor, isUpdateStateModel); + + // Add broker resource if needed + createBrokerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode); + + // Add lead controller resource if needed + createLeadControllerResourceIfNeeded(helixClusterName, helixAdmin, enableBatchMessageMode); + } finally { + if (zkClient != null) { + zkClient.close(); + } + } } - private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin admin, String zkPath, - boolean isUpdateStateModel) { - final String segmentStateModelName = + private static void addSegmentStateModelDefinitionIfNeeded(String helixClusterName, HelixAdmin helixAdmin, + HelixDataAccessor helixDataAccessor, boolean isUpdateStateModel) { + String segmentStateModelName = PinotHelixSegmentOnlineOfflineStateModelGenerator.PINOT_SEGMENT_ONLINE_OFFLINE_STATE_MODEL; - StateModelDefinition stateModelDefinition = admin.getStateModelDef(helixClusterName, segmentStateModelName); - if (stateModelDefinition == null) { - LOGGER.info("Adding state model {} (with CONSUMED state) generated using {}", segmentStateModelName, - PinotHelixSegmentOnlineOfflineStateModelGenerator.class.toString()); - admin.addStateModelDef(helixClusterName, segmentStateModelName, - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); - } else if (isUpdateStateModel) { - final StateModelDefinition curStateModelDef = admin.getStateModelDef(helixClusterName, segmentStateModelName); - List<String> states = curStateModelDef.getStatesPriorityList(); - if (states.contains(PinotHelixSegmentOnlineOfflineStateModelGenerator.CONSUMING_STATE)) { - LOGGER.info("State model {} already updated to contain CONSUMING state", segmentStateModelName); + StateModelDefinition stateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, segmentStateModelName); + if (stateModelDefinition == null || isUpdateStateModel) { + if (stateModelDefinition == null) { + LOGGER.info("Adding state model: {} with CONSUMING state", segmentStateModelName); } else { - LOGGER.info("Updating {} to add states for low level consumers", segmentStateModelName); - StateModelDefinition newStateModelDef = - PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition(); - ZkClient zkClient = new ZkClient(zkPath); - zkClient.waitUntilConnected(CommonConstants.Helix.ZkClient.DEFAULT_CONNECT_TIMEOUT_SEC, TimeUnit.SECONDS); - zkClient.setZkSerializer(new ZNRecordSerializer()); - HelixDataAccessor accessor = new ZKHelixDataAccessor(helixClusterName, new ZkBaseDataAccessor<>(zkClient)); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - accessor.setProperty(keyBuilder.stateModelDef(segmentStateModelName), newStateModelDef); - LOGGER.info("Completed updating state model {}", segmentStateModelName); - zkClient.close(); + LOGGER.info("Updating state model: {} to contain CONSUMING state", segmentStateModelName); } + helixDataAccessor + .createStateModelDef(PinotHelixSegmentOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } } - private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin admin, + private static void createBrokerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, boolean enableBatchMessageMode) { // Add broker resource online offline state model definition if needed - StateModelDefinition brokerResourceStateModelDefinition = admin.getStateModelDef(helixClusterName, + StateModelDefinition brokerResourceStateModelDefinition = helixAdmin.getStateModelDef(helixClusterName, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL); if (brokerResourceStateModelDefinition == null) { LOGGER.info("Adding state model definition named : {} generated using : {}", PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.class.toString()); - admin.addStateModelDef(helixClusterName, + helixAdmin.addStateModelDef(helixClusterName, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.PINOT_BROKER_RESOURCE_ONLINE_OFFLINE_STATE_MODEL, PinotHelixBrokerResourceOnlineOfflineStateModelGenerator.generatePinotStateModelDefinition()); } // Create broker resource if needed. - IdealState brokerResourceIdealState = - admin.getResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE); - if (brokerResourceIdealState == null) { + if (helixAdmin.getResourceIdealState(helixClusterName, BROKER_RESOURCE_INSTANCE) == null) { LOGGER.info("Adding empty ideal state for Broker!"); - HelixHelper - .updateResourceConfigsFor(new HashMap<>(), CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, helixClusterName, - admin); - IdealState idealState = PinotTableIdealStateBuilder - .buildEmptyIdealStateForBrokerResource(admin, helixClusterName, enableBatchMessageMode); - admin.setResourceIdealState(helixClusterName, CommonConstants.Helix.BROKER_RESOURCE_INSTANCE, idealState); + IdealState emptyIdealStateForBrokerResource = PinotTableIdealStateBuilder + .buildEmptyIdealStateForBrokerResource(helixAdmin, helixClusterName, enableBatchMessageMode); + helixAdmin.setResourceIdealState(helixClusterName, BROKER_RESOURCE_INSTANCE, emptyIdealStateForBrokerResource); } } - private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin admin, + private static void createLeadControllerResourceIfNeeded(String helixClusterName, HelixAdmin helixAdmin, boolean enableBatchMessageMode) { - StateModelDefinition masterSlaveStateModelDefinition = - admin.getStateModelDef(helixClusterName, MasterSlaveSMD.name); - if (masterSlaveStateModelDefinition == null) { - LOGGER.info("Adding state model definition named : {} generated using : {}", MasterSlaveSMD.name, - MasterSlaveSMD.class.toString()); - admin.addStateModelDef(helixClusterName, MasterSlaveSMD.name, MasterSlaveSMD.build()); - } - - IdealState leadControllerResourceIdealState = - admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - if (leadControllerResourceIdealState == null) { + if (helixAdmin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME) == null) { LOGGER.info("Cluster {} doesn't contain {}. Creating one.", helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - HelixHelper.updateResourceConfigsFor(new HashMap<>(), LEAD_CONTROLLER_RESOURCE_NAME, helixClusterName, admin); - // FULL-AUTO Master-Slave state model with CrushED reBalance strategy. - admin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, - CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE, MasterSlaveSMD.name, - IdealState.RebalanceMode.FULL_AUTO.toString(), CrushEdRebalanceStrategy.class.getName()); + // FULL-AUTO Master-Slave state model with CrushED rebalance strategy. + IdealState leadControllerResourceIdealState = new IdealState(LEAD_CONTROLLER_RESOURCE_NAME); + leadControllerResourceIdealState.setNumPartitions(NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE); + leadControllerResourceIdealState.setReplicas(Integer.toString(0)); + leadControllerResourceIdealState.setStateModelDefRef(MasterSlaveSMD.name); + leadControllerResourceIdealState.setRebalanceMode(IdealState.RebalanceMode.FULL_AUTO); + leadControllerResourceIdealState.setRebalanceStrategy(CrushEdRebalanceStrategy.class.getName()); // Set instance group tag for lead controller resource. - IdealState leadControllerIdealState = - admin.getResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME); - leadControllerIdealState.setInstanceGroupTag(CommonConstants.Helix.CONTROLLER_INSTANCE_TYPE); - leadControllerIdealState.setBatchMessageMode(enableBatchMessageMode); + leadControllerResourceIdealState.setInstanceGroupTag(CONTROLLER_INSTANCE_TYPE); + leadControllerResourceIdealState.setBatchMessageMode(enableBatchMessageMode); // The below config guarantees if active number of replicas is no less than minimum active replica, there will not be partition movements happened. // Set min active replicas to 0 and rebalance delay to 5 minutes so that if any master goes offline, Helix controller waits at most 5 minutes and then re-calculate the participant assignment. // This delay is helpful when periodic tasks are running and we don't want them to be re-run too frequently. // Plus, if virtual id is applied to controller hosts, swapping hosts would be easy as new hosts can use the same virtual id and it takes least effort to change the configs. - leadControllerIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS); - leadControllerIdealState.setRebalanceDelay(REBALANCE_DELAY_MS); - leadControllerIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE); - admin.setResourceIdealState(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerIdealState); - + leadControllerResourceIdealState.setMinActiveReplicas(MIN_ACTIVE_REPLICAS); + leadControllerResourceIdealState.setRebalanceDelay(REBALANCE_DELAY_MS); + leadControllerResourceIdealState.setDelayRebalanceEnabled(ENABLE_DELAY_REBALANCE); // Explicitly disable this resource when creating this new resource. // When all the controllers are running the code with the logic to handle this resource, it can be enabled for backward compatibility. // In the next major release, we can enable this resource by default, so that all the controller logic can be separated. - admin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, false); - - LOGGER.info("Re-balance lead controller resource with replicas: {}", - CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); - // Set it to 1 so that there's only 1 instance (i.e. master) shown in every partitions. - admin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, - CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); - } - } + // To enable the resource: + // helixAdmin.enableResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, true); + // helixAdmin.rebalance(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); + leadControllerResourceIdealState.enable(false); - private static void initPropertyStoreIfNeeded(String helixClusterName, String zkPath) { - String propertyStorePath = PropertyPathBuilder.propertyStore(helixClusterName); - ZkHelixPropertyStore<ZNRecord> propertyStore = - new ZkHelixPropertyStore<>(zkPath, new ZNRecordSerializer(), propertyStorePath); - if (!propertyStore.exists("/CONFIGS", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/CLUSTER", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/CLUSTER", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/TABLE", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/TABLE", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/CONFIGS/INSTANCE", AccessOption.PERSISTENT)) { - propertyStore.create("/CONFIGS/INSTANCE", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/SCHEMAS", AccessOption.PERSISTENT)) { - propertyStore.create("/SCHEMAS", new ZNRecord(""), AccessOption.PERSISTENT); - } - if (!propertyStore.exists("/SEGMENTS", AccessOption.PERSISTENT)) { - propertyStore.create("/SEGMENTS", new ZNRecord(""), AccessOption.PERSISTENT); + helixAdmin.addResource(helixClusterName, LEAD_CONTROLLER_RESOURCE_NAME, leadControllerResourceIdealState); } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java index 8f72a59..2382939 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerPeriodicTaskStarterTest.java @@ -30,7 +30,6 @@ import org.testng.annotations.Test; public class ControllerPeriodicTaskStarterTest extends ControllerTest { - private MockControllerStarter _mockControllerStarter; @BeforeClass public void setup() { @@ -51,32 +50,13 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest { } @Override - protected void startControllerStarter(ControllerConf config) { - _mockControllerStarter = new MockControllerStarter(config); - _mockControllerStarter.start(); - _helixResourceManager = _mockControllerStarter.getHelixResourceManager(); - _helixManager = _mockControllerStarter.getHelixControllerManager(); + protected ControllerStarter getControllerStarter(ControllerConf config) { + return new MockControllerStarter(config); } - @Override - protected void stopControllerStarter() { - Assert.assertNotNull(_mockControllerStarter); - - _mockControllerStarter.stop(); - _mockControllerStarter = null; - } - - @Override - protected ControllerStarter getControllerStarter() { - return _mockControllerStarter; - } - - private class MockControllerStarter extends TestOnlyControllerStarter { - + private class MockControllerStarter extends ControllerStarter { private static final int NUM_PERIODIC_TASKS = 7; - private List<PeriodicTask> _controllerPeriodicTasks; - public MockControllerStarter(ControllerConf conf) { super(conf); } @@ -90,14 +70,10 @@ public class ControllerPeriodicTaskStarterTest extends ControllerTest { Assert.assertNotNull(helixResourceManager.getHelixClusterName()); Assert.assertNotNull(helixResourceManager.getPropertyStore()); - _controllerPeriodicTasks = super.setupControllerPeriodicTasks(); - Assert.assertNotNull(_controllerPeriodicTasks); - Assert.assertEquals(_controllerPeriodicTasks.size(), NUM_PERIODIC_TASKS); - return _controllerPeriodicTasks; - } - - List<PeriodicTask> getControllerPeriodicTasks() { - return _controllerPeriodicTasks; + List<PeriodicTask> controllerPeriodicTasks = super.setupControllerPeriodicTasks(); + Assert.assertNotNull(controllerPeriodicTasks); + Assert.assertEquals(controllerPeriodicTasks.size(), NUM_PERIODIC_TASKS); + return controllerPeriodicTasks; } } } diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java index 9893377..4c5ff81 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/ControllerTest.java @@ -18,6 +18,7 @@ */ package org.apache.pinot.controller.helix; +import com.google.common.base.Preconditions; import java.io.BufferedReader; import java.io.BufferedWriter; import java.io.File; @@ -38,7 +39,6 @@ import org.apache.commons.io.IOUtils; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.ZNRecord; -import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.pinot.common.data.DimensionFieldSpec; import org.apache.pinot.common.data.FieldSpec; @@ -55,10 +55,9 @@ import org.testng.Assert; * Base class for controller tests. */ public abstract class ControllerTest { - public static final String LOCAL_HOST = "localhost"; - - private static final int DEFAULT_CONTROLLER_PORT = 18998; - private static final String DEFAULT_DATA_DIR = + protected static final String LOCAL_HOST = "localhost"; + protected static final int DEFAULT_CONTROLLER_PORT = 18998; + protected static final String DEFAULT_DATA_DIR = new File(FileUtils.getTempDirectoryPath(), "test-controller-" + System.currentTimeMillis()).getAbsolutePath(); protected int _controllerPort; @@ -66,7 +65,6 @@ public abstract class ControllerTest { protected ControllerRequestURLBuilder _controllerRequestURLBuilder; protected String _controllerDataDir; - protected ZkClient _zkClient; protected ControllerStarter _controllerStarter; protected PinotHelixResourceManager _helixResourceManager; protected HelixManager _helixManager; @@ -95,58 +93,33 @@ public abstract class ControllerTest { } } - public static ControllerConf getDefaultControllerConfiguration() { + public ControllerConf getDefaultControllerConfiguration() { ControllerConf config = new ControllerConf(); config.setControllerHost(LOCAL_HOST); config.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT)); config.setDataDir(DEFAULT_DATA_DIR); config.setZkStr(ZkStarter.DEFAULT_ZK_STR); + config.setHelixClusterName(getHelixClusterName()); return config; } - public class TestOnlyControllerStarter extends ControllerStarter { - - TestOnlyControllerStarter(ControllerConf conf) { - super(conf); - } - - @Override - public boolean isPinotOnlyModeSupported() { - return true; - } - } - protected void startController() { startController(getDefaultControllerConfiguration()); } protected void startController(ControllerConf config) { - startController(config, true); - } - - protected void startController(ControllerConf config, boolean deleteCluster) { - Assert.assertNotNull(config); - Assert.assertNull(_controllerStarter); + Preconditions.checkState(_controllerStarter == null); _controllerPort = Integer.valueOf(config.getControllerPort()); _controllerBaseApiUrl = "http://localhost:" + _controllerPort; _controllerRequestURLBuilder = ControllerRequestURLBuilder.baseUrl(_controllerBaseApiUrl); _controllerDataDir = config.getDataDir(); - String helixClusterName = getHelixClusterName(); - config.setHelixClusterName(helixClusterName); - - String zkStr = config.getZkStr(); - _zkClient = new ZkClient(zkStr); - if (_zkClient.exists("/" + helixClusterName) && deleteCluster) { - _zkClient.deleteRecursive("/" + helixClusterName); - } - startControllerStarter(config); // HelixResourceManager is null in Helix only mode, while HelixManager is null in Pinot only mode. - switch (getControllerStarter().getControllerMode()) { + switch (_controllerStarter.getControllerMode()) { case DUAL: case PINOT_ONLY: _helixAdmin = _helixResourceManager.getHelixAdmin(); @@ -160,16 +133,19 @@ public abstract class ControllerTest { } protected void startControllerStarter(ControllerConf config) { - _controllerStarter = new TestOnlyControllerStarter(config); + _controllerStarter = getControllerStarter(config); _controllerStarter.start(); _helixResourceManager = _controllerStarter.getHelixResourceManager(); _helixManager = _controllerStarter.getHelixControllerManager(); } + protected ControllerStarter getControllerStarter(ControllerConf config) { + return new ControllerStarter(config); + } + protected void stopController() { stopControllerStarter(); FileUtils.deleteQuietly(new File(_controllerDataDir)); - _zkClient.close(); } protected void stopControllerStarter() { @@ -179,10 +155,6 @@ public abstract class ControllerTest { _controllerStarter = null; } - protected ControllerStarter getControllerStarter() { - return _controllerStarter; - } - protected Schema createDummySchema(String tableName) { Schema schema = new Schema(); schema.setSchemaName(tableName); diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java index d91e612..67ebfe5 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/PinotControllerModeTest.java @@ -18,11 +18,11 @@ */ package org.apache.pinot.controller.helix; +import java.util.Arrays; import java.util.Map; import org.apache.helix.HelixAdmin; import org.apache.helix.HelixManager; import org.apache.helix.model.ExternalView; -import org.apache.pinot.common.utils.CommonConstants; import org.apache.pinot.controller.ControllerConf; import org.apache.pinot.controller.ControllerStarter; import org.apache.pinot.controller.helix.core.PinotHelixResourceManager; @@ -32,183 +32,186 @@ import org.testng.annotations.AfterClass; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT; + public class PinotControllerModeTest extends ControllerTest { private static long TIMEOUT_IN_MS = 10_000L; - private ControllerConf config; - private int controllerPortOffset; @BeforeClass public void setUp() { startZk(); - config = getDefaultControllerConfiguration(); - controllerPortOffset = 0; } @Test - public void testHelixOnlyController() - throws Exception { - config.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY); - config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - - startController(config); - TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS, - "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - - Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.HELIX_ONLY); - - stopController(); - _controllerStarter = null; + public void testHelixOnlyController() { + ControllerConf helixOnlyControllerConfig = getDefaultControllerConfiguration(); + helixOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY); + ControllerStarter helixOnlyController = getControllerStarter(helixOnlyControllerConfig); + helixOnlyController.start(); + TestUtils.waitForCondition(aVoid -> helixOnlyController.getHelixControllerManager().isConnected(), TIMEOUT_IN_MS, + "Failed to start the Helix-only controller"); + + helixOnlyController.stop(); } @Test - public void testDualModeController() - throws Exception { - config.setControllerMode(ControllerConf.ControllerMode.DUAL); - config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - + public void testDualModeController() { // Helix cluster will be set up when starting the first controller. - startController(config); - TestUtils.waitForCondition(aVoid -> _helixManager.isConnected(), TIMEOUT_IN_MS, - "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - Assert.assertEquals(_controllerStarter.getControllerMode(), ControllerConf.ControllerMode.DUAL); - - // Enable the lead controller resource. - _helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + ControllerConf firstDualModeControllerConfig = getDefaultControllerConfiguration(); + firstDualModeControllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL); + ControllerStarter firstDualModeController = getControllerStarter(firstDualModeControllerConfig); + firstDualModeController.start(); + TestUtils + .waitForCondition(aVoid -> firstDualModeController.getHelixControllerManager().isConnected(), TIMEOUT_IN_MS, + "Failed to start the first dual-mode controller"); // Starting a second dual-mode controller. Helix cluster has already been set up. - ControllerConf controllerConfig = getDefaultControllerConfiguration(); - controllerConfig.setHelixClusterName(getHelixClusterName()); - controllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL); - controllerConfig.setControllerPort( - Integer.toString(Integer.parseInt(this.config.getControllerPort()) + controllerPortOffset++)); - - ControllerStarter secondDualModeController = new TestOnlyControllerStarter(controllerConfig); + ControllerConf secondDualModeControllerConfig = getDefaultControllerConfiguration(); + secondDualModeControllerConfig.setControllerMode(ControllerConf.ControllerMode.DUAL); + secondDualModeControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT + 1)); + ControllerStarter secondDualModeController = getControllerStarter(secondDualModeControllerConfig); secondDualModeController.start(); TestUtils .waitForCondition(aVoid -> secondDualModeController.getHelixResourceManager().getHelixZkManager().isConnected(), - TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - Assert.assertEquals(secondDualModeController.getControllerMode(), ControllerConf.ControllerMode.DUAL); + TIMEOUT_IN_MS, "Failed to start the second dual-mode controller"); secondDualModeController.stop(); - stopController(); - _controllerStarter = null; + firstDualModeController.stop(); } // TODO: enable it after removing ControllerLeadershipManager which requires both CONTROLLER and PARTICIPANT // HelixManager @Test(enabled = false) - public void testPinotOnlyController() - throws Exception { - config.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); - config.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); + public void testPinotOnlyController() { + ControllerConf firstPinotOnlyControllerConfig = getDefaultControllerConfiguration(); + firstPinotOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); + ControllerStarter firstPinotOnlyController = getControllerStarter(firstPinotOnlyControllerConfig); // Starting pinot only controller before starting helix controller should fail. try { - startController(config); - Assert.fail("Starting pinot only controller should fail!"); - } catch (RuntimeException e) { - _controllerStarter = null; + firstPinotOnlyController.start(); + Assert.fail("Starting Pinot-only controller without Helix controller should fail"); + } catch (Exception e) { + // Expected } // Starting a helix controller. - ControllerConf config2 = getDefaultControllerConfiguration(); - config2.setHelixClusterName(getHelixClusterName()); - config2.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY); - config2.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - ControllerStarter helixControllerStarter = new ControllerStarter(config2); - helixControllerStarter.start(); - HelixManager helixControllerManager = helixControllerStarter.getHelixControllerManager(); + ControllerConf helixOnlyControllerConfig = getDefaultControllerConfiguration(); + helixOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.HELIX_ONLY); + helixOnlyControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT + 1)); + ControllerStarter helixOnlyController = new ControllerStarter(helixOnlyControllerConfig); + helixOnlyController.start(); + HelixManager helixControllerManager = helixOnlyController.getHelixControllerManager(); HelixAdmin helixAdmin = helixControllerManager.getClusterManagmentTool(); TestUtils.waitForCondition(aVoid -> helixControllerManager.isConnected(), TIMEOUT_IN_MS, - "Failed to start " + config2.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - - // Enable the lead controller resource. - helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + "Failed to start the Helix-only controller"); // Starting a pinot only controller. - ControllerConf config3 = getDefaultControllerConfiguration(); - config3.setHelixClusterName(getHelixClusterName()); - config3.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); - config3.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - - ControllerStarter firstPinotOnlyController = new TestOnlyControllerStarter(config3); firstPinotOnlyController.start(); - PinotHelixResourceManager firstPinotOnlyPinotHelixResourceManager = - firstPinotOnlyController.getHelixResourceManager(); + PinotHelixResourceManager helixResourceManager = firstPinotOnlyController.getHelixResourceManager(); + TestUtils.waitForCondition(aVoid -> helixResourceManager.getHelixZkManager().isConnected(), TIMEOUT_IN_MS, + "Failed to start the first Pinot-only controller"); - TestUtils.waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getHelixZkManager().isConnected(), - TIMEOUT_IN_MS, "Failed to start " + config.getControllerMode() + " controller in " + TIMEOUT_IN_MS + "ms."); - Assert.assertEquals(firstPinotOnlyController.getControllerMode(), ControllerConf.ControllerMode.PINOT_ONLY); + // Enable the lead controller resource. + helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, true); + helixAdmin.rebalance(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); + TestUtils.waitForCondition(aVoid -> { + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); + for (String partition : leadControllerResourceExternalView.getPartitionSet()) { + Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); + if (stateMap.size() != 1 || stateMap.values().contains("MASTER")) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Failed to choose the only participant as MASTER"); // Start a second Pinot only controller. - ControllerConf config4 = getDefaultControllerConfiguration(); - config4.setHelixClusterName(getHelixClusterName()); - config4.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); - config4.setControllerPort(Integer.toString(Integer.parseInt(config.getControllerPort()) + controllerPortOffset++)); - - ControllerStarter secondControllerStarter = new TestOnlyControllerStarter(config4); - secondControllerStarter.start(); - // Two controller instances assigned to cluster. - TestUtils - .waitForCondition(aVoid -> firstPinotOnlyPinotHelixResourceManager.getAllInstances().size() == 2, TIMEOUT_IN_MS, - "Failed to start the 2nd pinot only controller in " + TIMEOUT_IN_MS + "ms."); + ControllerConf secondPinotOnlyControllerConfig = getDefaultControllerConfiguration(); + secondPinotOnlyControllerConfig.setControllerMode(ControllerConf.ControllerMode.PINOT_ONLY); + secondPinotOnlyControllerConfig.setControllerPort(Integer.toString(DEFAULT_CONTROLLER_PORT + 2)); + ControllerStarter secondPinotOnlyController = getControllerStarter(secondPinotOnlyControllerConfig); + secondPinotOnlyController.start(); + TestUtils.waitForCondition( + aVoid -> secondPinotOnlyController.getHelixResourceManager().getHelixZkManager().isConnected(), TIMEOUT_IN_MS, + "Failed to start the second Pinot-only controller"); + TestUtils.waitForCondition(aVoid -> { + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); + for (String partition : leadControllerResourceExternalView.getPartitionSet()) { + Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); + if (stateMap.size() != 2 || stateMap.values().containsAll(Arrays.asList("MASTER", "SLAVE"))) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Failed to choose two participants as MASTER and SLAVE"); // Disable lead controller resource, all the participants are in offline state (from slave state). - helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, false); - + helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, false); TestUtils.waitForCondition(aVoid -> { - ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin() - .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); for (String partition : leadControllerResourceExternalView.getPartitionSet()) { Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); - for (Map.Entry<String, String> entry : stateMap.entrySet()) { - if (!"OFFLINE".equals(entry.getValue())) { + if (stateMap.size() != 2) { + return false; + } + for (String value : stateMap.values()) { + if (!value.equals("OFFLINE")) { return false; } } } return true; - }, TIMEOUT_IN_MS, "Failed to mark all the participants offline in " + TIMEOUT_IN_MS + "ms."); + }, TIMEOUT_IN_MS, "Failed to turn all the participants OFFLINE"); - // Re-enable lead controller resource, all the participants are in healthy state (either master or slave). - helixAdmin.enableResource(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME, true); + // Re-enable lead controller resource, all the participants are in healthy state (either MASTER or SLAVE). + helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, true); + TestUtils.waitForCondition(aVoid -> { + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); + for (String partition : leadControllerResourceExternalView.getPartitionSet()) { + Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); + if (stateMap.size() != 2 || stateMap.values().containsAll(Arrays.asList("MASTER", "SLAVE"))) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Failed to choose two participants as MASTER and SLAVE from OFFLINE"); // Shutdown one controller, it will be removed from external view of lead controller resource. - secondControllerStarter.stop(); - + secondPinotOnlyController.stop(); TestUtils.waitForCondition(aVoid -> { - ExternalView leadControllerResourceExternalView = firstPinotOnlyPinotHelixResourceManager.getHelixAdmin() - .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); for (String partition : leadControllerResourceExternalView.getPartitionSet()) { Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); - // Only 1 participant left in each partition, which will become the master. - for (Map.Entry<String, String> entry : stateMap.entrySet()) { - if (!"MASTER".equals(entry.getValue())) { - return false; - } + if (stateMap.size() != 1 || stateMap.values().contains("MASTER")) { + return false; } } return true; - }, TIMEOUT_IN_MS, "Failed to mark all the participants MASTER in " + TIMEOUT_IN_MS + "ms."); + }, TIMEOUT_IN_MS, "Failed to drop the first disconnected participant"); // Shutdown the only one controller left, the partition map should be empty. firstPinotOnlyController.stop(); TestUtils.waitForCondition(aVoid -> { - ExternalView leadControllerResourceExternalView = helixAdmin - .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + ExternalView leadControllerResourceExternalView = + helixAdmin.getResourceExternalView(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME); for (String partition : leadControllerResourceExternalView.getPartitionSet()) { Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); - // There's no participant in all the partitions. if (!stateMap.isEmpty()) { return false; } } return true; - }, TIMEOUT_IN_MS, "Failed to have all the partitions empty in " + TIMEOUT_IN_MS + "ms."); + }, TIMEOUT_IN_MS, "Failed to drop the second disconnected participant"); - _controllerStarter = null; - helixControllerStarter.stop(); + helixOnlyController.stop(); } @AfterClass diff --git a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java index a850ac4..21226a4 100644 --- a/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java +++ b/pinot-controller/src/test/java/org/apache/pinot/controller/helix/core/PinotHelixResourceManagerTest.java @@ -62,8 +62,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeClass; import org.testng.annotations.Test; -import static org.apache.pinot.common.utils.CommonConstants.Helix.*; -import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.*; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME; +import static org.apache.pinot.common.utils.CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT; +import static org.apache.pinot.common.utils.CommonConstants.Helix.NUMBER_OF_PARTITIONS_IN_LEAD_CONTROLLER_RESOURCE; +import static org.apache.pinot.common.utils.CommonConstants.Helix.PREFIX_OF_CONTROLLER_INSTANCE; +import static org.apache.pinot.controller.helix.core.PinotHelixResourceManager.InvalidTableConfigException; public class PinotHelixResourceManagerTest extends ControllerTest { @@ -82,28 +85,33 @@ public class PinotHelixResourceManagerTest extends ControllerTest { private final String _helixClusterName = getHelixClusterName(); @BeforeClass - public void setUp() throws Exception { + public void setUp() + throws Exception { startZk(); ControllerConf config = getDefaultControllerConfiguration(); config.setTenantIsolationEnabled(false); startController(config); - ControllerRequestBuilderUtil.addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, - ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false); - ControllerRequestBuilderUtil.addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, - NUM_INSTANCES, false, BASE_SERVER_ADMIN_PORT); + ControllerRequestBuilderUtil + .addFakeBrokerInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, + false); + ControllerRequestBuilderUtil + .addFakeDataInstancesToAutoJoinHelixCluster(_helixClusterName, ZkStarter.DEFAULT_ZK_STR, NUM_INSTANCES, false, + BASE_SERVER_ADMIN_PORT); // Create server tenant on all Servers - Tenant serverTenant = new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER) - .setOfflineInstances(NUM_INSTANCES) - .build(); + Tenant serverTenant = + new Tenant.TenantBuilder(SERVER_TENANT_NAME).setRole(TenantRole.SERVER).setOfflineInstances(NUM_INSTANCES) + .build(); _helixResourceManager.createServerTenant(serverTenant); _helixAdmin.enableResource(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, true); + _helixAdmin.rebalance(getHelixClusterName(), LEAD_CONTROLLER_RESOURCE_NAME, LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT); } @Test - public void testGetInstanceEndpoints() throws InvalidConfigException { + public void testGetInstanceEndpoints() + throws InvalidConfigException { Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME); BiMap<String, String> endpoints = _helixResourceManager.getDataInstanceAdminEndpoints(servers); for (int i = 0; i < NUM_INSTANCES; i++) { @@ -112,7 +120,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { } @Test - public void testGetInstanceConfigs() throws Exception { + public void testGetInstanceConfigs() + throws Exception { Set<String> servers = _helixResourceManager.getAllInstancesForServerTenant(SERVER_TENANT_NAME); for (String server : servers) { InstanceConfig cachedInstanceConfig = _helixResourceManager.getHelixInstanceConfig(server); @@ -129,7 +138,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { zkClient.close(); } - private void modifyExistingInstanceConfig(ZkClient zkClient) throws InterruptedException { + private void modifyExistingInstanceConfig(ZkClient zkClient) + throws InterruptedException { String instanceName = "Server_localhost_" + new Random().nextInt(NUM_INSTANCES); String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName); Assert.assertTrue(zkClient.exists(instanceConfigPath)); @@ -160,7 +170,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { zkClient.writeData(instanceConfigPath, znRecord); } - private void addAndRemoveNewInstanceConfig(ZkClient zkClient) throws Exception { + private void addAndRemoveNewInstanceConfig(ZkClient zkClient) + throws Exception { int biggerRandomNumber = NUM_INSTANCES + new Random().nextInt(NUM_INSTANCES); String instanceName = "Server_localhost_" + String.valueOf(biggerRandomNumber); String instanceConfigPath = PropertyPathBuilder.instanceConfig(_helixClusterName, instanceName); @@ -193,18 +204,16 @@ public class PinotHelixResourceManagerTest extends ControllerTest { } @Test - public void testRebuildBrokerResourceFromHelixTags() throws Exception { + public void testRebuildBrokerResourceFromHelixTags() + throws Exception { // Create broker tenant on 3 Brokers Tenant brokerTenant = new Tenant.TenantBuilder(BROKER_TENANT_NAME).setRole(TenantRole.BROKER).setTotalInstances(3).build(); _helixResourceManager.createBrokerTenant(brokerTenant); // Create the table - TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME) - .setNumReplicas(3) - .setBrokerTenant(BROKER_TENANT_NAME) - .setServerTenant(SERVER_TENANT_NAME) - .build(); + TableConfig tableConfig = new TableConfig.Builder(TableType.OFFLINE).setTableName(TABLE_NAME).setNumReplicas(3) + .setBrokerTenant(BROKER_TENANT_NAME).setServerTenant(SERVER_TENANT_NAME).build(); _helixResourceManager.addTable(tableConfig); // Check that the BrokerResource ideal state has 3 Brokers assigned to the table @@ -214,8 +223,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { // Untag all Brokers assigned to broker tenant for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) { - _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, - TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME)); + _helixAdmin + .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME)); _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); } @@ -238,7 +247,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest { } @Test - public void testRetrieveMetadata() throws Exception { + public void testRetrieveMetadata() { String segmentName = "testSegment"; // Test retrieving OFFLINE segment ZK metadata @@ -463,8 +472,7 @@ public class PinotHelixResourceManagerTest extends ControllerTest { } @Test - public void testLeadControllerResource() - throws Exception { + public void testLeadControllerResource() { IdealState leadControllerResourceIdealState = _helixResourceManager.getHelixAdmin() .getResourceIdealState(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); Assert.assertTrue(leadControllerResourceIdealState.isValid()); @@ -476,25 +484,23 @@ public class PinotHelixResourceManagerTest extends ControllerTest { Assert.assertEquals(leadControllerResourceIdealState.getReplicas(), Integer.toString(LEAD_CONTROLLER_RESOURCE_REPLICA_COUNT)); Assert.assertEquals(leadControllerResourceIdealState.getRebalanceMode(), IdealState.RebalanceMode.FULL_AUTO); - Assert.assertTrue(leadControllerResourceIdealState.getInstanceSet( - leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty()); - - TestUtils - .waitForCondition(aVoid -> { - ExternalView leadControllerResourceExternalView = _helixResourceManager.getHelixAdmin() - .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); - for (String partition : leadControllerResourceExternalView.getPartitionSet()) { - Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); - Map.Entry<String, String> entry = stateMap.entrySet().iterator().next(); - boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + _controllerPort).equals(entry.getKey()); - result &= "MASTER".equals(entry.getValue()); - if (!result) { - return false; - } - } - return true; - }, - TIMEOUT_IN_MS, "Failed to assign controller hosts to lead controller resource in " + TIMEOUT_IN_MS + " ms."); + Assert.assertTrue(leadControllerResourceIdealState + .getInstanceSet(leadControllerResourceIdealState.getPartitionSet().iterator().next()).isEmpty()); + + TestUtils.waitForCondition(aVoid -> { + ExternalView leadControllerResourceExternalView = _helixResourceManager.getHelixAdmin() + .getResourceExternalView(getHelixClusterName(), CommonConstants.Helix.LEAD_CONTROLLER_RESOURCE_NAME); + for (String partition : leadControllerResourceExternalView.getPartitionSet()) { + Map<String, String> stateMap = leadControllerResourceExternalView.getStateMap(partition); + Map.Entry<String, String> entry = stateMap.entrySet().iterator().next(); + boolean result = (PREFIX_OF_CONTROLLER_INSTANCE + LOCAL_HOST + "_" + _controllerPort).equals(entry.getKey()); + result &= "MASTER".equals(entry.getValue()); + if (!result) { + return false; + } + } + return true; + }, TIMEOUT_IN_MS, "Failed to assign controller hosts to lead controller resource in " + TIMEOUT_IN_MS + " ms."); } @Test @@ -536,9 +542,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { + port + "}{}{TAG_LIST=[controller]}")); } clusterDataCache.setInstanceConfigMap(instanceConfigMap); - ZNRecord znRecord = - crushEdRebalanceStrategy.computePartitionAssignment(instanceNames, instanceNames, new HashMap<>(0), - clusterDataCache); + ZNRecord znRecord = crushEdRebalanceStrategy + .computePartitionAssignment(instanceNames, instanceNames, new HashMap<>(0), clusterDataCache); Assert.assertNotNull(znRecord); Map<String, List<String>> listFields = znRecord.getListFields(); @@ -551,8 +556,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { if (!instanceToMasterAssignmentCountMap.containsKey(assignments.get(0))) { instanceToMasterAssignmentCountMap.put(assignments.get(0), 1); } else { - instanceToMasterAssignmentCountMap.put(assignments.get(0), - instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1); + instanceToMasterAssignmentCountMap + .put(assignments.get(0), instanceToMasterAssignmentCountMap.get(assignments.get(0)) + 1); } maxCount = Math.max(instanceToMasterAssignmentCountMap.get(assignments.get(0)), maxCount); } @@ -568,8 +573,8 @@ public class PinotHelixResourceManagerTest extends ControllerTest { public void cleanUpBrokerTags() { // Untag all Brokers for other tests for (String brokerInstance : _helixResourceManager.getAllInstancesForBrokerTenant(BROKER_TENANT_NAME)) { - _helixAdmin.removeInstanceTag(_helixClusterName, brokerInstance, - TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME)); + _helixAdmin + .removeInstanceTag(_helixClusterName, brokerInstance, TagNameUtils.getBrokerTagForTenant(BROKER_TENANT_NAME)); _helixAdmin.addInstanceTag(_helixClusterName, brokerInstance, CommonConstants.Helix.UNTAGGED_BROKER_INSTANCE); } } diff --git a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java index e1fdb14..176c120 100644 --- a/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java +++ b/pinot-core/src/test/java/org/apache/pinot/util/TestUtils.java @@ -140,35 +140,32 @@ public class TestUtils { * @param timeoutMs Timeout in milliseconds * @param errorMessage Error message if condition is not met before timed out */ - public static void waitForCondition(@Nonnull Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, - @Nullable String errorMessage) - throws Exception { + public static void waitForCondition(Function<Void, Boolean> condition, long checkIntervalMs, long timeoutMs, + @Nullable String errorMessage) { long endTime = System.currentTimeMillis() + timeoutMs; - try { - while (System.currentTimeMillis() < endTime) { - Boolean isConditionMet = condition.apply(null); - if ((isConditionMet != null) && isConditionMet) { + while (System.currentTimeMillis() < endTime) { + try { + if (Boolean.TRUE.equals(condition.apply(null))) { return; } Thread.sleep(checkIntervalMs); + } catch (Exception e) { + if (errorMessage != null) { + Assert.fail("Caught exception while checking the condition, error message: " + errorMessage, e); + } else { + Assert.fail("Caught exception while checking the condition", e); + } } - if (errorMessage != null) { - Assert.fail("Failed to meet condition in " + timeoutMs + "ms, error message: " + errorMessage); - } else { - Assert.fail("Failed to meet condition in " + timeoutMs + "ms"); - } - } catch (Exception e) { - if (errorMessage != null) { - Assert.fail("Caught exception while checking the condition, error message: " + errorMessage, e); - } else { - Assert.fail("Caught exception while checking the condition", e); - } + } + if (errorMessage != null) { + Assert.fail("Failed to meet condition in " + timeoutMs + "ms, error message: " + errorMessage); + } else { + Assert.fail("Failed to meet condition in " + timeoutMs + "ms"); } } - public static void waitForCondition(@Nonnull Function<Void, Boolean> condition, long timeoutMs, - @Nullable String errorMessage) - throws Exception { + public static void waitForCondition(Function<Void, Boolean> condition, long timeoutMs, + @Nullable String errorMessage) { waitForCondition(condition, 1000L, timeoutMs, errorMessage); } } --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
