Repository: helix Updated Branches: refs/heads/master baf8b830e -> 07e05a4c1
RoutingTableProvider for TargetExternalView Implement the RoutingTableProvide for TargetExternalView subscription. Write several tests for it. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/07e05a4c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/07e05a4c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/07e05a4c Branch: refs/heads/master Commit: 07e05a4c1c456b7e09c7750955fa46c34e1a288a Parents: baf8b83 Author: Junkai Xue <[email protected]> Authored: Mon Feb 5 17:09:28 2018 -0800 Committer: Lei Xia <[email protected]> Committed: Thu Mar 15 11:03:11 2018 -0700 ---------------------------------------------------------------------- .../java/org/apache/helix/HelixConstants.java | 36 +++-- .../helix/common/BasicClusterDataCache.java | 20 ++- .../helix/spectator/RoutingDataCache.java | 6 +- .../helix/spectator/RoutingTableProvider.java | 44 +++++- .../TestRoutingTableProviderWithSourceType.java | 152 +++++++++++++++++++ 5 files changed, 233 insertions(+), 25 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/HelixConstants.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixConstants.java b/helix-core/src/main/java/org/apache/helix/HelixConstants.java index 6935a23..f84173b 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixConstants.java +++ b/helix-core/src/main/java/org/apache/helix/HelixConstants.java @@ -26,20 +26,30 @@ public interface HelixConstants { // TODO: ChangeType and PropertyType are duplicated, consider unifying enum ChangeType { // @formatter:off - IDEAL_STATE, - CONFIG, - INSTANCE_CONFIG, - RESOURCE_CONFIG, - CLUSTER_CONFIG, - LIVE_INSTANCE, - CURRENT_STATE, - MESSAGE, - EXTERNAL_VIEW, - TARGET_EXTERNAL_VIEW, - CONTROLLER, - MESSAGES_CONTROLLER, - HEALTH + IDEAL_STATE (PropertyType.IDEALSTATES), + CONFIG (PropertyType.CONFIGS), + INSTANCE_CONFIG (PropertyType.CONFIGS), + RESOURCE_CONFIG (PropertyType.CONFIGS), + CLUSTER_CONFIG (PropertyType.CONFIGS), + LIVE_INSTANCE (PropertyType.LIVEINSTANCES), + CURRENT_STATE (PropertyType.CURRENTSTATES), + MESSAGE (PropertyType.MESSAGES), + EXTERNAL_VIEW (PropertyType.EXTERNALVIEW), + TARGET_EXTERNAL_VIEW (PropertyType.TARGETEXTERNALVIEW), + CONTROLLER (PropertyType.CONTROLLER), + MESSAGES_CONTROLLER (PropertyType.MESSAGES_CONTROLLER), + HEALTH (PropertyType.HEALTHREPORT); // @formatter:on + + private final PropertyType _propertyType; + + ChangeType(PropertyType propertyType) { + _propertyType = propertyType; + } + + public PropertyType getPropertyType() { + return _propertyType; + } } /** http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java index 994ebfb..d48cfbb 100644 --- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java @@ -25,7 +25,9 @@ import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixException; import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; @@ -41,16 +43,23 @@ public class BasicClusterDataCache { private Map<String, LiveInstance> _liveInstanceMap; private Map<String, InstanceConfig> _instanceConfigMap; private Map<String, ExternalView> _externalViewMap; + private final PropertyType _sourceDataType; + protected String _clusterName; protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap; public BasicClusterDataCache(String clusterName) { + this(clusterName, PropertyType.EXTERNALVIEW); + } + + public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) { _propertyDataChangedMap = new ConcurrentHashMap<>(); _liveInstanceMap = new HashMap<>(); _instanceConfigMap = new HashMap<>(); _externalViewMap = new HashMap<>(); _clusterName = clusterName; + _sourceDataType = sourceDataType; } /** @@ -68,7 +77,16 @@ public class BasicClusterDataCache { if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) { long start = System.currentTimeMillis(); _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false)); - _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews()); + switch (_sourceDataType) { + case EXTERNALVIEW: + _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews()); + break; + case TARGETEXTERNALVIEW: + _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews()); + break; + default: + break; + } if (LOG.isDebugEnabled()) { LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + ( System.currentTimeMillis() - start) + " ms"); http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index a754f55..5602333 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -20,15 +20,15 @@ package org.apache.helix.spectator; */ import org.apache.helix.HelixConstants; +import org.apache.helix.PropertyType; import org.apache.helix.common.BasicClusterDataCache; /** * Cache the cluster data that are needed by RoutingTableProvider. */ public class RoutingDataCache extends BasicClusterDataCache { - - public RoutingDataCache(String clusterName) { - super(clusterName); + public RoutingDataCache(String clusterName, PropertyType sourceDataType) { + super(clusterName, sourceDataType); requireFullRefresh(); } http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index a89636b..cd4e3d2 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -27,6 +27,7 @@ import java.util.concurrent.atomic.AtomicReference; import org.apache.helix.HelixConstants; import org.apache.helix.HelixException; import org.apache.helix.HelixManager; +import org.apache.helix.PropertyType; import org.apache.helix.api.listeners.ConfigChangeListener; import org.apache.helix.api.listeners.InstanceConfigChangeListener; import org.apache.helix.api.listeners.ExternalViewChangeListener; @@ -51,19 +52,41 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc private final AtomicReference<RoutingTable> _routingTableRef; private final HelixManager _helixManager; private final RouterUpdater _routerUpdater; + private final PropertyType _sourceDataType; public RoutingTableProvider() { this(null); } public RoutingTableProvider(HelixManager helixManager) throws HelixException { + this(helixManager, PropertyType.EXTERNALVIEW); + } + + public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException { _routingTableRef = new AtomicReference<>(new RoutingTable()); _helixManager = helixManager; - String clusterName = null; + _sourceDataType = sourceDataType; + String clusterName = _helixManager != null ? _helixManager.getClusterName() : null; + _routerUpdater = new RouterUpdater(clusterName, _sourceDataType); + _routerUpdater.start(); if (_helixManager != null) { - clusterName = _helixManager.getClusterName(); try { - _helixManager.addExternalViewChangeListener(this); + switch (_sourceDataType) { + case EXTERNALVIEW: + _helixManager.addExternalViewChangeListener(this); + break; + case TARGETEXTERNALVIEW: + // Check whether target external has been enabled or not + if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( + _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), + 0)) { + throw new HelixException("Target External View is not enabled!"); + } + _helixManager.addTargetExternalViewChangeListener(this); + break; + default: + throw new HelixException("Unsupported source data type: " + sourceDataType); + } _helixManager.addInstanceConfigChangeListener(this); _helixManager.addLiveInstanceChangeListener(this); } catch (Exception e) { @@ -71,8 +94,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc throw new HelixException("Failed to attach listeners to HelixManager!", e); } } - _routerUpdater = new RouterUpdater(clusterName); - _routerUpdater.start(); } /** @@ -188,7 +209,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc */ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state, List<String> resourceTags) { - return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state, resourceTags); + return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state, + resourceTags); } /** @@ -211,6 +233,12 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc @PreFetch(enabled = false) public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) { + HelixConstants.ChangeType changeType = changeContext.getChangeType(); + if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) { + logger.warn("onExternalViewChange called with dis-matched change types. Source data type " + + _sourceDataType + ", changed data type: " + changeType); + return; + } // Refresh with full list of external view. // keep this here for back-compatibility if (externalViewList != null && externalViewList.size() > 0) { @@ -268,9 +296,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc private class RouterUpdater extends ClusterEventProcessor { private final RoutingDataCache _dataCache; - public RouterUpdater(String clusterName) { + public RouterUpdater(String clusterName, PropertyType sourceDataType) { super("Helix-RouterUpdater-event_process"); - _dataCache = new RoutingDataCache(clusterName); + _dataCache = new RoutingDataCache(clusterName, sourceDataType); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/07e05a4c/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java new file mode 100644 index 0000000..ddd27e3 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProviderWithSourceType.java @@ -0,0 +1,152 @@ +package org.apache.helix.integration.Spectator; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.PropertyType; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.task.WorkflowGenerator; +import org.apache.helix.mock.participant.MockDelayMSStateModelFactory; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.tools.ClusterSetup; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestRoutingTableProviderWithSourceType extends ZkIntegrationTestBase { + private HelixManager _manager; + private ClusterSetup _setupTool; + private final String MASTER_SLAVE_STATE_MODEL = "MasterSlave"; + private final int NUM_NODES = 10; + protected int NUM_PARTITIONS = 20; + protected int NUM_REPLICAS = 3; + private final int START_PORT = 12918; + private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + getShortClassName(); + private MockParticipantManager[] _participants; + private ClusterControllerManager _controller; + private ConfigAccessor _configAccessor; + + @BeforeClass + public void beforeClass() throws Exception { + String namespace = "/" + CLUSTER_NAME; + _participants = new MockParticipantManager[NUM_NODES]; + if (_gZkClient.exists(namespace)) { + _gZkClient.deleteRecursively(namespace); + } + + _setupTool = new ClusterSetup(ZK_ADDR); + _setupTool.addCluster(CLUSTER_NAME, true); + + _participants = new MockParticipantManager[NUM_NODES]; + for (int i = 0; i < NUM_NODES; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _setupTool.addInstanceToCluster(CLUSTER_NAME, storageNodeName); + } + + _setupTool.addResourceToCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_PARTITIONS, + MASTER_SLAVE_STATE_MODEL, IdealState.RebalanceMode.FULL_AUTO.name()); + + _setupTool + .rebalanceStorageCluster(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB, NUM_REPLICAS); + + for (int i = 0; i < NUM_NODES; i++) { + String instanceName = PARTICIPANT_PREFIX + "_" + (START_PORT + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + + // add a delayed state model + StateMachineEngine stateMachine = _participants[i].getStateMachineEngine(); + MockDelayMSStateModelFactory delayFactory = + new MockDelayMSStateModelFactory().setDelay(-300000L); + stateMachine.registerStateModelFactory(MASTER_SLAVE_STATE_MODEL, delayFactory); + _participants[i].syncStart(); + } + + _manager = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "Admin", InstanceType.ADMINISTRATOR, ZK_ADDR); + _manager.connect(); + + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + _configAccessor = new ConfigAccessor(_gZkClient); + } + + @AfterClass + public void afterClass() throws Exception { + _manager.disconnect(); + for (int i = 0; i < NUM_NODES; i++) { + if (_participants[i] != null && _participants[i].isConnected()) { + _participants[i].reset(); + } + } + } + + @Test (expectedExceptions = HelixException.class) + public void testTargetExternalViewWithoutEnable() { + new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW); + } + + @Test + public void testExternalViewDoesNotExist() { + String resourceName = WorkflowGenerator.DEFAULT_TGT_DB + 1; + RoutingTableProvider externalViewProvider = + new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW); + Assert.assertEquals(externalViewProvider.getInstancesForResource(resourceName, "SLAVE").size(), + 0); + } + + @Test (dependsOnMethods = "testTargetExternalViewWithoutEnable") + public void testExternalViewDiffFromTargetExternalView() throws InterruptedException { + ClusterConfig clusterConfig = _configAccessor.getClusterConfig(CLUSTER_NAME); + clusterConfig.enableTargetExternalView(true); + clusterConfig.setPersistBestPossibleAssignment(true); + _configAccessor.setClusterConfig(CLUSTER_NAME, clusterConfig); + Thread.sleep(2000); + + RoutingTableProvider externalViewProvider = + new RoutingTableProvider(_manager, PropertyType.EXTERNALVIEW); + RoutingTableProvider targetExternalViewProvider = + new RoutingTableProvider(_manager, PropertyType.TARGETEXTERNALVIEW); + + // ExternalView should not contain any MASTERS + // TargetExternalView should contain MASTERS same as the partition number + Set<InstanceConfig> externalViewMasters = + externalViewProvider.getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER"); + Assert.assertEquals(externalViewMasters.size(), 0); + Set<InstanceConfig> targetExternalViewMasters = targetExternalViewProvider + .getInstancesForResource(WorkflowGenerator.DEFAULT_TGT_DB, "MASTER"); + Assert.assertEquals(targetExternalViewMasters.size(), NUM_NODES); + + // TargetExternalView MASTERS mapping should exactly match IdealState MASTERS mapping + Map<String, Map<String, String>> stateMap = _setupTool.getClusterManagementTool() + .getResourceIdealState(CLUSTER_NAME, WorkflowGenerator.DEFAULT_TGT_DB).getRecord() + .getMapFields(); + + Set<String> idealMasters = new HashSet<>(); + Set<String> targetMasters = new HashSet<>(); + for (Map<String, String> instanceMap : stateMap.values()) { + for (String instance : instanceMap.keySet()) { + if (instanceMap.get(instance).equals("MASTER")) { + idealMasters.add(instance); + } + } + } + + for (InstanceConfig instanceConfig : targetExternalViewMasters) { + targetMasters.add(instanceConfig.getInstanceName()); + } + Assert.assertTrue(idealMasters.equals(targetMasters)); + } +}
