Repository: helix Updated Branches: refs/heads/helix-0.6.x 2775e1566 -> a23beb7cf
[HELIX-599] Support creating/maintaining/routing resources with same names in different instance groups. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/a23beb7c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/a23beb7c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/a23beb7c Branch: refs/heads/helix-0.6.x Commit: a23beb7cf79a3f1da104a55477c7eddb594fa68b Parents: 2775e15 Author: Lei Xia <[email protected]> Authored: Mon May 11 10:54:27 2015 -0700 Committer: Lei Xia <[email protected]> Committed: Thu Jul 9 22:37:54 2015 -0700 ---------------------------------------------------------------------- .../stages/MessageGenerationPhase.java | 23 +- .../stages/ResourceComputationStage.java | 8 + .../apache/helix/manager/zk/ZKHelixAdmin.java | 7 + .../org/apache/helix/model/ExternalView.java | 36 ++ .../java/org/apache/helix/model/IdealState.java | 45 +- .../java/org/apache/helix/model/Message.java | 38 ++ .../java/org/apache/helix/model/Resource.java | 35 ++ .../helix/spectator/RoutingTableProvider.java | 304 ++++++++++-- .../org/apache/helix/tools/ClusterSetup.java | 82 +++- .../integration/TestResourceGroupEndtoEnd.java | 465 +++++++++++++++++++ 10 files changed, 989 insertions(+), 54 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index bc3c739..2e919f8 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -70,7 +70,6 @@ public class MessageGenerationPhase extends AbstractBaseStage { for (String resourceName : resourceMap.keySet()) { Resource resource = resourceMap.get(resourceName); - int bucketSize = resource.getBucketSize(); StateModelDefinition stateModelDef = cache.getStateModelDef(resource.getStateModelDefRef()); @@ -125,9 +124,8 @@ public class MessageGenerationPhase extends AbstractBaseStage { } else { Message message = - createMessage(manager, resourceName, partition.getPartitionName(), instanceName, - currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId(), - resource.getStateModelFactoryname(), bucketSize); + createMessage(manager, resource, partition.getPartitionName(), instanceName, + currentState, nextState, sessionIdMap.get(instanceName), stateModelDef.getId()); IdealState idealState = cache.getIdealState(resourceName); if (idealState != null @@ -188,23 +186,30 @@ public class MessageGenerationPhase extends AbstractBaseStage { event.addAttribute(AttributeName.MESSAGES_ALL.toString(), output); } - private Message createMessage(HelixManager manager, String resourceName, String partitionName, + private Message createMessage(HelixManager manager, Resource resource, String partitionName, String instanceName, String currentState, String nextState, String sessionId, - String stateModelDefName, String stateModelFactoryName, int bucketSize) { + String stateModelDefName) { String uuid = UUID.randomUUID().toString(); Message message = new Message(MessageType.STATE_TRANSITION, uuid); message.setSrcName(manager.getInstanceName()); message.setTgtName(instanceName); message.setMsgState(MessageState.NEW); message.setPartitionName(partitionName); - message.setResourceName(resourceName); + message.setResourceName(resource.getResourceName()); message.setFromState(currentState); message.setToState(nextState); message.setTgtSessionId(sessionId); message.setSrcSessionId(manager.getSessionId()); message.setStateModelDef(stateModelDefName); - message.setStateModelFactoryName(stateModelFactoryName); - message.setBucketSize(bucketSize); + message.setStateModelFactoryName(resource.getStateModelFactoryname()); + message.setBucketSize(resource.getBucketSize()); + + if (resource.getResourceGroupName() != null) { + message.setResourceGroupName(resource.getResourceGroupName()); + } + if (resource.getResourceTag() != null) { + message.setResourceTag(resource.getResourceTag()); + } return message; } http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java index 5676098..bde2904 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ResourceComputationStage.java @@ -62,6 +62,8 @@ public class ResourceComputationStage extends AbstractBaseStage { resource.setStateModelFactoryName(idealState.getStateModelFactoryName()); resource.setBucketSize(idealState.getBucketSize()); resource.setBatchMessageMode(idealState.getBatchMessageMode()); + resource.setResourceGroupName(idealState.getResourceGroupName()); + resource.setResourceTag(idealState.getInstanceGroupTag()); } for (String partition : partitionSet) { @@ -102,6 +104,12 @@ public class ResourceComputationStage extends AbstractBaseStage { resource.setStateModelFactoryName(currentState.getStateModelFactoryName()); resource.setBucketSize(currentState.getBucketSize()); resource.setBatchMessageMode(currentState.getBatchMessageMode()); + + IdealState idealState = idealStates.get(resourceName); + if (idealState != null) { + resource.setResourceGroupName(idealState.getResourceGroupName()); + resource.setResourceTag(idealState.getInstanceGroupTag()); + } } if (currentState.getStateModelDefRef() == null) { http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index ecf84f8..e97ac9b 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -404,6 +404,13 @@ public class ZKHelixAdmin implements HelixAdmin { message.setToState(stateModel.getInitialState()); message.setStateModelFactoryName(idealState.getStateModelFactoryName()); + if (idealState.getResourceGroupName() != null) { + message.setResourceGroupName(idealState.getResourceGroupName()); + } + if (idealState.getInstanceGroupTag() != null) { + message.setResourceTag(idealState.getInstanceGroupTag()); + } + resetMessages.add(message); messageKeys.add(keyBuilder.message(instanceName, message.getId())); } http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/ExternalView.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java index d5f1afc..7b201b0 100644 --- a/helix-core/src/main/java/org/apache/helix/model/ExternalView.java +++ b/helix-core/src/main/java/org/apache/helix/model/ExternalView.java @@ -31,6 +31,16 @@ import org.apache.helix.ZNRecord; * of current states for the partitions in a resource */ public class ExternalView extends HelixProperty { + + /** + * Properties that are persisted and are queryable for an external view + */ + public enum ExternalViewProperty { + INSTANCE_GROUP_TAG, + RESOURCE_GROUP_NAME, + GROUP_ROUTING_ENABLED + } + /** * Instantiate an external view with the resource it corresponds to * @param resource the name of the resource @@ -95,6 +105,32 @@ public class ExternalView extends HelixProperty { return _record.getId(); } + /** + * Get the resource group name + * + * @return the name of the resource group this resource belongs to. + */ + public String getResourceGroupName() { + return _record.getSimpleField(ExternalViewProperty.RESOURCE_GROUP_NAME.toString()); + } + + /** + * Check whether the group routing is enabled for this resource. + * + * @return true if the group routing enabled for this resource; false otherwise + */ + public boolean isGroupRoutingEnabled() { + return _record.getBooleanField(ExternalViewProperty.GROUP_ROUTING_ENABLED.name(), false); + } + + /** + * Check for a group tag of this resource + * @return the group tag, or null if none is present + */ + public String getInstanceGroupTag() { + return _record.getSimpleField(ExternalViewProperty.INSTANCE_GROUP_TAG.toString()); + } + @Override public boolean isValid() { return true; http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/IdealState.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/IdealState.java b/helix-core/src/main/java/org/apache/helix/model/IdealState.java index bc31e1e..d2744ac 100644 --- a/helix-core/src/main/java/org/apache/helix/model/IdealState.java +++ b/helix-core/src/main/java/org/apache/helix/model/IdealState.java @@ -55,7 +55,9 @@ public class IdealState extends HelixProperty { MAX_PARTITIONS_PER_INSTANCE, INSTANCE_GROUP_TAG, REBALANCER_CLASS_NAME, - HELIX_ENABLED + HELIX_ENABLED, + RESOURCE_GROUP_NAME, + GROUP_ROUTING_ENABLED } public static final String QUERY_LIST = "PREFERENCE_LIST_QUERYS"; @@ -113,7 +115,7 @@ public class IdealState extends HelixProperty { } /** - * Get the rebalance mode of the ideal state + * Set the rebalance mode of the ideal state * @param mode {@link IdealStateModeProperty} */ @Deprecated @@ -124,7 +126,7 @@ public class IdealState extends HelixProperty { } /** - * Get the rebalance mode of the resource + * Set the rebalance mode of the resource * @param rebalancerType */ public void setRebalanceMode(RebalanceMode rebalancerType) { @@ -160,6 +162,43 @@ public class IdealState extends HelixProperty { } /** + * Set the resource group name + * @param resourceGroupName + */ + public void setResourceGroupName(String resourceGroupName) { + _record.setSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString(), resourceGroupName); + } + + /** + * Get the resource group name + * + * @return + */ + public String getResourceGroupName() { + return _record.getSimpleField(IdealStateProperty.RESOURCE_GROUP_NAME.toString()); + } + + /** + * Get if the resource group routing feature is enabled or not + * By default, it's disabled + * + * @return true if enabled; false otherwise + */ + public boolean isResourceGroupEnabled() { + return _record.getBooleanField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(), false); + } + + /** + * Enable/Disable the aggregated routing on resource group. + * + * @param enabled + */ + public void enableGroupRouting(boolean enabled) { + _record.setSimpleField(IdealStateProperty.GROUP_ROUTING_ENABLED.name(), + Boolean.toString(enabled)); + } + + /** * Set the maximum number of partitions of this resource that an instance can serve * @param max the maximum number of partitions supported */ http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Message.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Message.java b/helix-core/src/main/java/org/apache/helix/model/Message.java index 937a28e..9fed87b 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Message.java +++ b/helix-core/src/main/java/org/apache/helix/model/Message.java @@ -64,6 +64,8 @@ public class Message extends HelixProperty { MSG_STATE, PARTITION_NAME, RESOURCE_NAME, + RESOURCE_GROUP_NAME, + RESOURCE_TAG, FROM_STATE, TO_STATE, STATE_MODEL_DEF, @@ -397,6 +399,42 @@ public class Message extends HelixProperty { } /** + * Set the resource group associated with this message + * + * @param resourceGroupName resource group name to set + */ + public void setResourceGroupName(String resourceGroupName) { + _record.setSimpleField(Attributes.RESOURCE_GROUP_NAME.toString(), resourceGroupName); + } + + /** + * Get the resource group name associated with this message + * + * @return resource group name + */ + public String getResourceGroupName() { + return _record.getSimpleField(Attributes.RESOURCE_GROUP_NAME.toString()); + } + + /** + * Set the resource tag associated with this message + * + * @param resourceTag resource tag to set + */ + public void setResourceTag(String resourceTag) { + _record.setSimpleField(Attributes.RESOURCE_TAG.toString(), resourceTag); + } + + /** + * Get the resource tag associated with this message + * + * @return resource tag + */ + public String getResourceTag() { + return _record.getSimpleField(Attributes.RESOURCE_TAG.toString()); + } + + /** * Get the resource partition associated with this message * @return partition name */ http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/model/Resource.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/model/Resource.java b/helix-core/src/main/java/org/apache/helix/model/Resource.java index 1544514..7a22686 100644 --- a/helix-core/src/main/java/org/apache/helix/model/Resource.java +++ b/helix-core/src/main/java/org/apache/helix/model/Resource.java @@ -38,6 +38,8 @@ public class Resource { private String _stateModelFactoryName; private int _bucketSize = 0; private boolean _batchMessageMode = false; + private String _resourceGroupName; + private String _resourceTag; /** * Instantiate a resource by its name @@ -149,6 +151,39 @@ public class Resource { return _batchMessageMode; } + /** + * Get the resource tag assigned to this resource + * + * @return the name of the tag + */ + public String getResourceTag() { + return _resourceTag; + } + + /** + * Set the resource tag + * @param resourceTag + */ + public void setResourceTag(String resourceTag) { + _resourceTag = resourceTag; + } + + /** + * Get resource group name + * @return the resource group name + */ + public String getResourceGroupName() { + return _resourceGroupName; + } + + /** + * Set resource group name + * @param resourceGroupName + */ + public void setResourceGroupName(String resourceGroupName) { + _resourceGroupName = resourceGroupName; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(); http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 9bba660..bd2b44d 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.Collections; import java.util.Comparator; import java.util.HashMap; +import java.util.HashSet; import java.util.List; import java.util.Map; import java.util.Set; @@ -50,6 +51,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC /** * returns the instances for {resource,partition} pair that are in a specific * {state} + * + * This method will be deprecated, please use the + * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method. * @param resourceName * - * @param partitionName @@ -57,6 +61,19 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC * @return empty list if there is no instance in a given state */ public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state) { + return getInstancesForResource(resourceName, partitionName, state); + } + + /** + * returns the instances for {resource,partition} pair that are in a specific + * {state} + * @param resourceName + * - + * @param partitionName + * @param state + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, String state) { List<InstanceConfig> instanceList = null; RoutingTable _routingTable = _routingTableRef.get(); ResourceInfo resourceInfo = _routingTable.get(resourceName); @@ -73,15 +90,93 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC } /** + * returns the instances for {resource group,partition} pair in all resources belongs to the given + * resource group that are in a specific {state}. + * + * The return results aggregate all partition states from all the resources in the given resource + * group. + * + * @param resourceGroupName + * @param partitionName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, + String partitionName, String state) { + List<InstanceConfig> instanceList = null; + RoutingTable _routingTable = _routingTableRef.get(); + ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + PartitionInfo keyInfo = resourceGroupInfo.get(partitionName); + if (keyInfo != null) { + instanceList = keyInfo.get(state); + } + } + if (instanceList == null) { + instanceList = Collections.emptyList(); + } + return instanceList; + } + + /** + * returns the instances for {resource group,partition} pair contains any of the given tags + * that are in a specific {state}. + * + * Find all resources belongs to the given resource group that have any of the given resource tags + * and return the aggregated partition states from all these resources. + * + * @param resourceGroupName + * @param partitionName + * @param state + * @param resourceTags + * + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName, + String state, List<String> resourceTags) { + RoutingTable _routingTable = _routingTableRef.get(); + ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); + List<InstanceConfig> instanceList = null; + if (resourceGroupInfo != null) { + instanceList = new ArrayList<InstanceConfig>(); + for (String tag : resourceTags) { + PartitionInfo keyInfo = resourceGroupInfo.get(partitionName, tag); + if (keyInfo != null && keyInfo.containsState(state)) { + instanceList.addAll(keyInfo.get(state)); + } + } + } + if (instanceList == null) { + return Collections.emptyList(); + } + + return instanceList; + } + + /** * returns all instances for {resource} that are in a specific {state} - * @param resource + * + * This method will be deprecated, please use the + * {@link #getInstancesForResource(String, String) getInstancesForResource} method. + * @param resourceName * @param state * @return empty list if there is no instance in a given state */ - public Set<InstanceConfig> getInstances(String resource, String state) { + public Set<InstanceConfig> getInstances(String resourceName, String state) { + return getInstancesForResource(resourceName, state); + } + + /** + * returns all instances for {resource} that are in a specific {state}. + * @param resourceName + * @param state + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) { Set<InstanceConfig> instanceSet = null; RoutingTable routingTable = _routingTableRef.get(); - ResourceInfo resourceInfo = routingTable.get(resource); + ResourceInfo resourceInfo = routingTable.get(resourceName); if (resourceInfo != null) { instanceSet = resourceInfo.getInstances(state); } @@ -91,6 +186,56 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC return instanceSet; } + /** + * returns all instances for all resources in {resource group} that are in a specific {state} + * + * @param resourceGroupName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) { + Set<InstanceConfig> instanceSet = null; + RoutingTable _routingTable = _routingTableRef.get(); + ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + instanceSet = resourceGroupInfo.getInstances(state); + } + if (instanceSet == null) { + instanceSet = Collections.emptySet(); + } + return instanceSet; + } + + /** + * returns all instances for resources contains any given tags in {resource group} that are in a + * specific {state} + * + * @param resourceGroupName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state, + List<String> resourceTags) { + Set<InstanceConfig> instanceSet = null; + RoutingTable _routingTable = _routingTableRef.get(); + ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + instanceSet = new HashSet<InstanceConfig>(); + for (String tag : resourceTags) { + Set<InstanceConfig> instances = resourceGroupInfo.getInstances(state, tag); + if (instances != null) { + instanceSet.addAll(resourceGroupInfo.getInstances(state, tag)); + } + } + } + if (instanceSet == null) { + return Collections.emptySet(); + } + return instanceSet; + } + @Override public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) { @@ -139,12 +284,16 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC String currentState = stateMap.get(instanceName); if (instanceConfigMap.containsKey(instanceName)) { InstanceConfig instanceConfig = instanceConfigMap.get(instanceName); - newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig); + if (extView.isGroupRoutingEnabled()) { + newRoutingTable.addEntry(resourceName, extView.getResourceGroupName(), + extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig); + } else { + newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig); + } } else { logger.error("Invalid instance name." + instanceName + " .Not found in /cluster/configs/. instanceName: "); } - } } } @@ -153,10 +302,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC } class RoutingTable { - private final HashMap<String, ResourceInfo> resourceInfoMap; + // mapping a resourceName to the ResourceInfo + private final Map<String, ResourceInfo> resourceInfoMap; + + // mapping a resource group name to a resourceGroupInfo + private final Map<String, ResourceGroupInfo> resourceGroupInfoMap; public RoutingTable() { - resourceInfoMap = new HashMap<String, RoutingTableProvider.ResourceInfo>(); + resourceInfoMap = new HashMap<String, ResourceInfo>(); + resourceGroupInfoMap = new HashMap<String, ResourceGroupInfo>(); } public void addEntry(String resourceName, String partitionName, String state, @@ -166,20 +320,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC } ResourceInfo resourceInfo = resourceInfoMap.get(resourceName); resourceInfo.addEntry(partitionName, state, config); + } + + /** + * add an entry with a resource with resourceGrouping enabled. + */ + public void addEntry(String resourceName, String resourceGroupName, String resourceTag, + String partitionName, String state, InstanceConfig config) { + addEntry(resourceName, partitionName, state, config); + if (!resourceGroupInfoMap.containsKey(resourceGroupName)) { + resourceGroupInfoMap.put(resourceGroupName, new ResourceGroupInfo()); + } + + ResourceGroupInfo resourceGroupInfo = resourceGroupInfoMap.get(resourceGroupName); + resourceGroupInfo.addEntry(resourceTag, partitionName, state, config); } ResourceInfo get(String resourceName) { return resourceInfoMap.get(resourceName); } + ResourceGroupInfo getResourceGroup(String resourceGroupName) { + return resourceGroupInfoMap.get(resourceGroupName); + } } + private static Comparator<InstanceConfig> INSTANCE_CONFIG_COMPARATOR = + new Comparator<InstanceConfig>() { + @Override + public int compare(InstanceConfig o1, InstanceConfig o2) { + if (o1 == o2) { + return 0; + } + if (o1 == null) { + return -1; + } + if (o2 == null) { + return 1; + } + + int compareTo = o1.getHostName().compareTo(o2.getHostName()); + if (compareTo == 0) { + return o1.getPort().compareTo(o2.getPort()); + } else { + return compareTo; + } + + } + }; + + /** + * Class to store instances, partitions and their states for each resource. + */ class ResourceInfo { // store PartitionInfo for each partition - HashMap<String, PartitionInfo> partitionInfoMap; + Map<String, PartitionInfo> partitionInfoMap; // stores the Set of Instances in a given state - HashMap<String, Set<InstanceConfig>> stateInfoMap; + Map<String, Set<InstanceConfig>> stateInfoMap; public ResourceInfo() { partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>(); @@ -189,30 +387,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC public void addEntry(String stateUnitKey, String state, InstanceConfig config) { // add if (!stateInfoMap.containsKey(state)) { - Comparator<InstanceConfig> comparator = new Comparator<InstanceConfig>() { - - @Override - public int compare(InstanceConfig o1, InstanceConfig o2) { - if (o1 == o2) { - return 0; - } - if (o1 == null) { - return -1; - } - if (o2 == null) { - return 1; - } - - int compareTo = o1.getHostName().compareTo(o2.getHostName()); - if (compareTo == 0) { - return o1.getPort().compareTo(o2.getPort()); - } else { - return compareTo; - } - - } - }; - stateInfoMap.put(state, new TreeSet<InstanceConfig>(comparator)); + stateInfoMap.put(state, new TreeSet<InstanceConfig>(INSTANCE_CONFIG_COMPARATOR)); } Set<InstanceConfig> set = stateInfoMap.get(state); set.add(config); @@ -222,7 +397,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC } PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey); stateUnitKeyInfo.addEntry(state, config); - } public Set<InstanceConfig> getInstances(String state) { @@ -235,8 +409,64 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC } } + /** + * Class to store instances, partitions and their states for each resource group. + */ + class ResourceGroupInfo { + // aggregated partitions and instances info for all resources in the resource group. + ResourceInfo aggregatedResourceInfo; + + // <ResourceTag, ResourceInfo> maps resource tag to the resource with the tag + // in this resource group. + // Each ResourceInfo saves only partitions and instances for that resource. + Map<String, ResourceInfo> tagToResourceMap; + + public ResourceGroupInfo() { + aggregatedResourceInfo = new ResourceInfo(); + tagToResourceMap = new HashMap<String, ResourceInfo>(); + } + + public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) { + // add the new entry to the aggregated resource info + aggregatedResourceInfo.addEntry(stateUnitKey, state, config); + + // add the entry to the resourceInfo with given tag + if (!tagToResourceMap.containsKey(resourceTag)) { + tagToResourceMap.put(resourceTag, new ResourceInfo()); + } + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + resourceInfo.addEntry(stateUnitKey, state, config); + } + + public Set<InstanceConfig> getInstances(String state) { + return aggregatedResourceInfo.getInstances(state); + } + + public Set<InstanceConfig> getInstances(String state, String resourceTag) { + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + if (resourceInfo != null) { + return resourceInfo.getInstances(state); + } + + return null; + } + + PartitionInfo get(String stateUnitKey) { + return aggregatedResourceInfo.get(stateUnitKey); + } + + PartitionInfo get(String stateUnitKey, String resourceTag) { + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + if (resourceInfo == null) { + return null; + } + + return resourceInfo.get(stateUnitKey); + } + } + class PartitionInfo { - HashMap<String, List<InstanceConfig>> stateInfoMap; + Map<String, List<InstanceConfig>> stateInfoMap; public PartitionInfo() { stateInfoMap = new HashMap<String, List<InstanceConfig>>(); @@ -253,5 +483,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC List<InstanceConfig> get(String state) { return stateInfoMap.get(state); } + + boolean containsState(String state) { + return stateInfoMap.containsKey(state); + } } } http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java index 37c4915..9d411bb 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterSetup.java @@ -37,6 +37,7 @@ import org.apache.commons.cli.OptionGroup; import org.apache.commons.cli.Options; import org.apache.commons.cli.ParseException; import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; import org.apache.helix.HelixConstants.StateModelToken; import org.apache.helix.HelixException; import org.apache.helix.PropertyKey.Builder; @@ -57,6 +58,7 @@ import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.OnlineOfflineSMD; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.ConstraintItemBuilder; import org.apache.helix.model.builder.HelixConfigScopeBuilder; @@ -220,6 +222,10 @@ public class ClusterSetup { _admin.addInstance(clusterName, config); } + public void addInstanceTag(String clusterName, String instanceName, String tag) { + _admin.addInstanceTag(clusterName, instanceName, tag); + } + public void dropInstancesFromCluster(String clusterName, String[] instanceInfoArray) { for (String instanceInfo : instanceInfoArray) { if (instanceInfo.length() > 0) { @@ -340,6 +346,10 @@ public class ClusterSetup { _admin.addStateModelDef(clusterName, stateModelDef, record, overwritePrevious); } + public void addResourceToCluster(String clusterName, String resourceName, IdealState idealState) { + _admin.addResource(clusterName, resourceName, idealState); + } + public void addResourceToCluster(String clusterName, String resourceName, int numResources, String stateModelRef) { addResourceToCluster(clusterName, resourceName, numResources, stateModelRef, @@ -363,6 +373,50 @@ public class ClusterSetup { bucketSize, maxPartitionsPerInstance); } + + /** + * Get the mangled IdealState name if resourceGroup/resourceTag is enable. + */ + public static String genIdealStateNameWithResourceTag(String resourceName, String resourceTag) { + return resourceName + "$" + resourceTag; + } + + /** + * Create an IdealState for a resource that belongs to a resource group We use + * "resourceGroupName$resourceInstanceTag" as the IdealState znode name to differetiate different + * resources from the same resourceGroup. + */ + public IdealState createIdealStateForResourceGroup(String resourceGroupName, + String resourceTag, int numPartition, int replica, String rebalanceMode, String stateModelDefName) { + String idealStateId = genIdealStateNameWithResourceTag(resourceGroupName, resourceTag); + IdealState idealState = new IdealState(idealStateId); + idealState.setNumPartitions(numPartition); + idealState.setStateModelDefRef(stateModelDefName); + IdealState.RebalanceMode mode = + idealState.rebalanceModeFromString(rebalanceMode, IdealState.RebalanceMode.SEMI_AUTO); + idealState.setRebalanceMode(mode); + idealState.setReplicas("" + replica); + idealState.setStateModelFactoryName(HelixConstants.DEFAULT_STATE_MODEL_FACTORY); + idealState.setResourceGroupName(resourceGroupName); + idealState.setInstanceGroupTag(resourceTag); + idealState.enableGroupRouting(true); + + return idealState; + } + + /** + * Enable or disable a resource within a resource group associated with a given resource tag + * + * @param clusterName + * @param resourceName + * @param resourceTag + */ + public void enableResource(String clusterName, String resourceName, String resourceTag, + boolean enabled) { + String idealStateId = genIdealStateNameWithResourceTag(resourceName, resourceTag); + _admin.enableResource(clusterName, idealStateId, enabled); + } + public void dropResourceFromCluster(String clusterName, String resourceName) { _admin.dropResource(clusterName, resourceName); } @@ -448,7 +502,7 @@ public class ClusterSetup { /** * set configs * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc. - * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB + * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB * @param keyValuePairs csv-formatted key-value pairs. e.g. k1=v1,k2=v2 */ public void setConfig(ConfigScopeProperty type, String scopeArgsCsv, String keyValuePairs) { @@ -463,7 +517,7 @@ public class ClusterSetup { /** * remove configs * @param type config-scope type, e.g. CLUSTER, RESOURCE, etc. - * @param scopesStr scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB + * @param scopeArgsCsv scopeArgsCsv csv-formatted scope-args, e.g myCluster,testDB * @param keysCsv csv-formatted keys. e.g. k1,k2 */ public void removeConfig(ConfigScopeProperty type, String scopeArgsCsv, String keysCsv) { @@ -616,7 +670,7 @@ public class ClusterSetup { .create(); listInstancesOption.setArgs(1); listInstancesOption.setRequired(false); - listInstancesOption.setArgName("clusterName"); + listInstancesOption.setArgName("clusterName <-tag tagName>"); Option addClusterOption = OptionBuilder.withLongOpt(addCluster).withDescription("Add a new cluster").create(); @@ -747,7 +801,8 @@ public class ClusterSetup { Option enableResourceOption = OptionBuilder.withLongOpt(enableResource).withDescription("Enable/disable a resource") - .hasArgs(3).isRequired(false).withArgName("clusterName resourceName true/false") + .hasArgs(3).isRequired(false) + .withArgName("clusterName resourceName true/false <-tag resourceTag>") .create(); Option rebalanceOption = @@ -1146,9 +1201,17 @@ public class ClusterSetup { return 0; } else if (cmd.hasOption(listInstances)) { String clusterName = cmd.getOptionValue(listInstances); - List<String> instances = - setupTool.getClusterManagementTool().getInstancesInCluster(clusterName); + List<String> instances; + if (cmd.hasOption(tag)) { + String instanceTag = cmd.getOptionValues(tag)[0]; + instances = setupTool.getClusterManagementTool() + .getInstancesInClusterWithTag(clusterName, instanceTag); + } else { + instances = + setupTool.getClusterManagementTool().getInstancesInCluster(clusterName); + } + System.out.println("Instances in cluster " + clusterName + ":"); for (String instanceName : instances) { System.out.println(instanceName); @@ -1251,7 +1314,12 @@ public class ClusterSetup { String clusterName = cmd.getOptionValues(enableResource)[0]; String resourceName = cmd.getOptionValues(enableResource)[1]; boolean enabled = Boolean.parseBoolean(cmd.getOptionValues(enableResource)[2].toLowerCase()); - setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled); + if (cmd.hasOption(tag)) { + String resourceTag = cmd.getOptionValues(tag)[0]; + setupTool.enableResource(clusterName, resourceName, resourceTag, enabled); + } else { + setupTool.getClusterManagementTool().enableResource(clusterName, resourceName, enabled); + } } else if (cmd.hasOption(enablePartition)) { String[] args = cmd.getOptionValues(enablePartition); http://git-wip-us.apache.org/repos/asf/helix/blob/a23beb7c/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java new file mode 100644 index 0000000..3466b2f --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java @@ -0,0 +1,465 @@ +package org.apache.helix.integration; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CountDownLatch; +import org.apache.helix.HelixAdmin; +import org.apache.helix.HelixConstants; +import org.apache.helix.TestHelper; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.integration.manager.ZkTestManager; +import org.apache.helix.manager.zk.CallbackHandler; +import org.apache.helix.manager.zk.ZKHelixAdmin; +import org.apache.helix.manager.zk.ZKHelixManager; +import org.apache.helix.manager.zk.ZNRecordSerializer; +import org.apache.helix.manager.zk.ZkClient; +import org.apache.helix.mock.participant.DummyProcess; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.OnlineOfflineSMD; +import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.tools.ClusterSetup; +import org.apache.helix.tools.ClusterStateVerifier; +import org.testng.Assert; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.model.Message; +import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.participant.statemachine.StateModel; +import org.apache.helix.participant.statemachine.StateModelFactory; +import org.apache.log4j.Logger; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase { + + protected static final int GROUP_NODE_NR = 5; + protected static final int START_PORT = 12918; + protected static final String STATE_MODEL = "OnlineOffline"; + protected static final String TEST_DB = "TestDB"; + protected static final int PARTITIONS = 20; + protected static final int INSTANCE_GROUP_NR = 4; + protected static final int TOTAL_NODE_NR = GROUP_NODE_NR * INSTANCE_GROUP_NR; + + protected final String CLASS_NAME = getShortClassName(); + protected final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + protected TestParticipantManager[] _participants = new TestParticipantManager[TOTAL_NODE_NR]; + protected ClusterControllerManager _controller; + protected RoutingTableProvider _routingTableProvider; + private HelixAdmin _admin; + HelixManager _spectator; + + int _replica = 3; + + @BeforeClass + public void beforeClass() throws Exception { + _admin = new ZKHelixAdmin(_gZkClient); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + List<String> instanceGroupTags = new ArrayList<String>(); + for (int i = 0; i < INSTANCE_GROUP_NR; i++) { + String groupTag = "cluster_" + i; + addInstanceGroup(CLUSTER_NAME, groupTag, GROUP_NODE_NR); + instanceGroupTags.add(groupTag); + } + + for (String tag : instanceGroupTags) { + List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, tag); + IdealState idealState = + createIdealState(TEST_DB, tag, instances, PARTITIONS, _replica, + IdealState.RebalanceMode.CUSTOMIZED.toString()); + _gSetupTool.addResourceToCluster(CLUSTER_NAME, idealState.getResourceName(), idealState); + } + + // start dummy participants + int i = 0; + for (String group : instanceGroupTags) { + List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, group); + for (String instance : instances) { + _participants[i] = + new TestParticipantManager(ZK_ADDR, CLUSTER_NAME, TEST_DB, group, instance); + _participants[i].syncStart(); + i++; + } + } + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + boolean result = + ClusterStateVerifier.verifyByZkCallback( + new ClusterStateVerifier.BestPossAndExtViewZkVerifier(ZK_ADDR, + CLUSTER_NAME)); + Assert.assertTrue(result); + + // start speculator + _routingTableProvider = new RoutingTableProvider(); + _spectator = + HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, + ZK_ADDR); + _spectator.connect(); + _spectator.addExternalViewChangeListener(_routingTableProvider); + Thread.sleep(1000); + } + + @AfterClass + public void afterClass() { + // stop participants + for (int i = 0; i < TOTAL_NODE_NR; i++) { + _participants[i].syncStop(); + } + + _controller.syncStop(); + _spectator.disconnect(); + } + + public IdealState createIdealState(String resourceGroupName, String instanceGroupTag, + List<String> instanceNames, int numPartition, int replica, String rebalanceMode) { + IdealState is = + _gSetupTool.createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, + numPartition, replica, rebalanceMode, "OnlineOffline"); + + // setup initial partition->instance mapping. + int nodeIdx = 0; + int numNode = instanceNames.size(); + assert (numNode >= replica); + for (int i = 0; i < numPartition; i++) { + String partitionName = resourceGroupName + "_" + i; + for (int j = 0; j < replica; j++) { + is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode), + OnlineOfflineSMD.States.ONLINE.toString()); + } + nodeIdx++; + } + + return is; + } + + private void addInstanceGroup(String clusterName, String instanceTag, int numInstance) { + List<String> instances = new ArrayList<String>(); + for (int i = 0; i < numInstance; i++) { + String storageNodeName = PARTICIPANT_PREFIX + "_" + instanceTag + "_" + (START_PORT + i); + instances.add(storageNodeName); + _gSetupTool.addInstanceToCluster(clusterName, storageNodeName); + _gSetupTool.addInstanceTag(clusterName, storageNodeName, instanceTag); + } + } + + @Test + public void testRoutingTable() throws Exception { + // Verify routing table works + Set<InstanceConfig> allOnlineNodes = + _routingTableProvider.getInstancesForResourceGroup(TEST_DB, "ONLINE"); + Assert.assertEquals(allOnlineNodes.size(), TOTAL_NODE_NR); + + List<InstanceConfig> onlinePartitions = + _routingTableProvider.getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE"); + Assert.assertEquals(onlinePartitions.size(), INSTANCE_GROUP_NR * _replica); + + Set<InstanceConfig> selectedNodes = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2); + + List<InstanceConfig> selectedPartition = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE", + Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedPartition.size(), _replica * 2); + } + + @Test(dependsOnMethods = { "testRoutingTable" }) + public void testEnableDisableClusters() throws InterruptedException { + // disable a resource + _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", false); + + Thread.sleep(500); + + Set<InstanceConfig> selectedNodes = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 1); + + List<InstanceConfig> selectedPartition = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE", + Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedPartition.size(), _replica * 1); + + // enable a resource + _gSetupTool.enableResource(CLUSTER_NAME, TEST_DB, "cluster_2", true); + Thread.sleep(500); + + selectedNodes = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, "ONLINE", Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedNodes.size(), GROUP_NODE_NR * 2); + + selectedPartition = _routingTableProvider + .getInstancesForResourceGroup(TEST_DB, TEST_DB + "_0", "ONLINE", + Arrays.asList("cluster_2", "cluster_3")); + Assert.assertEquals(selectedPartition.size(), _replica * 2); + } + + public static class MockProcess { + private static final Logger logger = Logger.getLogger(DummyProcess.class); + // public static final String rootNamespace = "rootNamespace"; + + private final String _zkConnectString; + private final String _clusterName; + private final String _instanceName; + private final String _resourceName; + private final String _resourceTag; + // private StateMachineEngine genericStateMachineHandler; + + private int _transDelayInMs = 0; + private final String _clusterMangerType; + + public MockProcess(String zkConnectString, String clusterName, String resourceName, + String instanceName, String resourceTag, + String clusterMangerType, int delay) { + _zkConnectString = zkConnectString; + _clusterName = clusterName; + _resourceName = resourceName; + _resourceTag = resourceTag; + _instanceName = instanceName; + _clusterMangerType = clusterMangerType; + _transDelayInMs = delay > 0 ? delay : 0; + } + + static void sleep(long transDelay) { + try { + if (transDelay > 0) { + Thread.sleep(transDelay); + } + } catch (InterruptedException e) { + // TODO Auto-generated catch block + e.printStackTrace(); + } + } + + public HelixManager start() throws Exception { + HelixManager manager = null; + // zk cluster manager + if (_clusterMangerType.equalsIgnoreCase("zk")) { + manager = + HelixManagerFactory.getZKHelixManager(_clusterName, _instanceName, + InstanceType.PARTICIPANT, _zkConnectString); + } else { + throw new IllegalArgumentException("Unsupported cluster manager type:" + _clusterMangerType); + } + + MockOnlineOfflineStateModelFactory stateModelFactory2 = + new MockOnlineOfflineStateModelFactory(_transDelayInMs, _resourceName, _resourceTag, + _instanceName); + // genericStateMachineHandler = new StateMachineEngine(); + StateMachineEngine stateMach = manager.getStateMachineEngine(); + stateMach.registerStateModelFactory("OnlineOffline", stateModelFactory2); + + manager.connect(); + //manager.getMessagingService().registerMessageHandlerFactory(MessageType.STATE_TRANSITION.toString(), genericStateMachineHandler); + return manager; + } + + public static class MockOnlineOfflineStateModelFactory extends + StateModelFactory<MockOnlineOfflineStateModel> { + int _delay; + String _instanceName; + String _resourceName; + String _resourceTag; + + public MockOnlineOfflineStateModelFactory(int delay, String resourceName, String resourceTag, + String instanceName) { + _delay = delay; + _instanceName = instanceName; + _resourceName = resourceName; + _resourceTag = resourceTag; + } + + @Override + public MockOnlineOfflineStateModel createNewStateModel(String resourceName, String stateUnitKey) { + MockOnlineOfflineStateModel model = new MockOnlineOfflineStateModel(); + model.setDelay(_delay); + model.setInstanceName(_instanceName); + model.setResourceName(_resourceName); + model.setResourceTag(_resourceTag); + return model; + } + } + + public static class MockOnlineOfflineStateModel extends StateModel { + int _transDelay = 0; + String _instanceName; + String _resourceName; + String _resourceTag; + + public void setDelay(int delay) { + _transDelay = delay > 0 ? delay : 0; + } + + public void setInstanceName(String instanceName) {_instanceName = instanceName;} + + public void setResourceTag(String resourceTag) { + _resourceTag = resourceTag; + } + + public void setResourceName(String resourceName) { + _resourceName = resourceName; + } + + public void onBecomeOnlineFromOffline(Message message, NotificationContext context) { + String db = message.getPartitionName(); + String instanceName = context.getManager().getInstanceName(); + MockProcess.sleep(_transDelay); + + logger.info("MockStateModel.onBecomeOnlineFromOffline(), instance:" + instanceName + ", db:" + + db); + + logger.info( + "MockStateModel.onBecomeOnlineFromOffline(), resource " + message.getResourceName() + + ", partition" + + message.getPartitionName()); + + verifyMessage(message); + } + + public void onBecomeOfflineFromOnline(Message message, NotificationContext context) { + MockProcess.sleep(_transDelay); + + logger.info( + "MockStateModel.onBecomeOfflineFromOnline(), resource " + message.getResourceName() + + ", partition" + + message.getPartitionName() + ", targetName: " + message.getTgtName()); + + try { + Thread.sleep(1); + } catch (InterruptedException e) { + e.printStackTrace(); + } + + verifyMessage(message); + } + + public void onBecomeDroppedFromOffline(Message message, NotificationContext context) { + MockProcess.sleep(_transDelay); + + logger.info( + "MockStateModel.onBecomeDroppedFromOffline(), resource " + message.getResourceName() + + ", partition" + + message.getPartitionName()); + + verifyMessage(message); + } + + private void verifyMessage(Message message) { + assert _instanceName.equals(message.getTgtName()); + assert _resourceName.equals(message.getResourceGroupName()); + assert _resourceTag.equals(message.getResourceTag()); + } + } + } + + public static class TestParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager { + private static Logger LOG = Logger.getLogger(TestParticipantManager.class); + + private final CountDownLatch _startCountDown = new CountDownLatch(1); + private final CountDownLatch _stopCountDown = new CountDownLatch(1); + private final CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1); + + private String _instanceGroup; + private String _resourceName; + + public TestParticipantManager(String zkAddr, String clusterName, String resourceName, + String instanceGroup, String instanceName) { + super(clusterName, instanceName, InstanceType.PARTICIPANT, zkAddr); + _instanceGroup = instanceGroup; + _resourceName = resourceName; + } + + public void syncStop() { + _stopCountDown.countDown(); + try { + _waitStopCompleteCountDown.await(); + } catch (InterruptedException e) { + LOG.error("exception in syncStop participant-manager", e); + } + } + + public void syncStart() { + try { + new Thread(this).start(); + _startCountDown.await(); + } catch (InterruptedException e) { + LOG.error("exception in syncStart participant-manager", e); + } + } + + @Override + public void run() { + try { + StateMachineEngine stateMach = getStateMachineEngine(); + MockProcess.MockOnlineOfflineStateModelFactory + ofModelFactory = + new MockProcess.MockOnlineOfflineStateModelFactory(10, _resourceName, _instanceGroup, + getInstanceName()); + stateMach.registerStateModelFactory("OnlineOffline", ofModelFactory); + + connect(); + _startCountDown.countDown(); + + _stopCountDown.await(); + } catch (InterruptedException e) { + String msg = + "participant: " + getInstanceName() + ", " + Thread.currentThread().getName() + + " is interrupted"; + LOG.info(msg); + } catch (Exception e) { + LOG.error("exception running participant-manager", e); + } finally { + _startCountDown.countDown(); + + disconnect(); + _waitStopCompleteCountDown.countDown(); + } + } + + @Override + public ZkClient getZkClient() { + return _zkclient; + } + + @Override + public List<CallbackHandler> getHandlers() { + return _handlers; + } + } +}
