Improve the data load in Helix Spectator (RoutingTableProvider), which includes: 1) Put event callback handler in a spearate thread so other ZK event callbacks won't be blocked. 2) Deduplicate the callbacks from same event type, always keep just one latest copy of event callback in the event queue. 3) Add methods to return all instances and liveInstances in the cluster.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/093f7ab9 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/093f7ab9 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/093f7ab9 Branch: refs/heads/master Commit: 093f7ab949b1d6d77f8589b96e7a1a10762f7e6b Parents: 2f3a56f Author: Lei Xia <[email protected]> Authored: Wed Nov 8 14:56:09 2017 -0800 Committer: Junkai Xue <[email protected]> Committed: Wed Jan 24 18:31:51 2018 -0800 ---------------------------------------------------------------------- .../helix/common/ClusterEventBlockingQueue.java | 126 ++++++ .../helix/common/ClusterEventProcessor.java | 57 +++ .../controller/GenericHelixController.java | 7 +- .../stages/ClusterEventBlockingQueue.java | 124 ----- .../helix/spectator/RoutingDataCache.java | 161 +++++++ .../apache/helix/spectator/RoutingTable.java | 437 ++++++++++++++++++ .../helix/spectator/RoutingTableProvider.java | 453 ++++++------------- .../java/org/apache/helix/TestRoutingTable.java | 3 +- .../stages/TestClusterEventBlockingQueue.java | 1 + .../Spectator/TestRoutingTableProvider.java | 173 +++++++ .../helix/integration/TestBasicSpectator.java | 1 + .../integration/TestResourceGroupEndtoEnd.java | 29 +- .../common/ZkIntegrationTestBase.java | 39 ++ .../manager/MockParticipantManager.java | 6 +- .../messaging/TestP2PMessageSemiAuto.java | 18 +- 15 files changed, 1147 insertions(+), 488 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java new file mode 100644 index 0000000..075edf9 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventBlockingQueue.java @@ -0,0 +1,126 @@ +package org.apache.helix.common; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Map; +import java.util.Queue; +import java.util.concurrent.BlockingQueue; + +import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.controller.stages.ClusterEventType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; + +/** + * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents + * multiple events of the same type from flooding the controller and preventing progress from being + * made. This queue has no capacity. This class is meant to be a limited implementation of the + * {@link BlockingQueue} interface. + */ +public class ClusterEventBlockingQueue { + private static final Logger LOG = LoggerFactory.getLogger(ClusterEventBlockingQueue.class); + private final Map<ClusterEventType, ClusterEvent> _eventMap; + private final Queue<ClusterEvent> _eventQueue; + + /** + * Instantiate the queue + */ + public ClusterEventBlockingQueue() { + _eventMap = Maps.newHashMap(); + _eventQueue = Lists.newLinkedList(); + } + + /** + * Remove all events from the queue + */ + public synchronized void clear() { + _eventMap.clear(); + _eventQueue.clear(); + } + + /** + * Add a single event to the queue, overwriting events with the same name + * @param event ClusterEvent event to add + */ + public synchronized void put(ClusterEvent event) { + if (!_eventMap.containsKey(event.getEventType())) { + // only insert if there isn't a same-named event already present + boolean result = _eventQueue.offer(event); + if (!result) { + return; + } + } + // always overwrite in case this is a FINALIZE + _eventMap.put(event.getEventType(), event); + LOG.debug("Putting event " + event.getEventType()); + LOG.debug("Event queue size: " + _eventQueue.size()); + notify(); + } + + /** + * Remove an element from the front of the queue, blocking if none is available. This method + * will return the most recent event seen with the oldest enqueued event name. + * @return ClusterEvent at the front of the queue + * @throws InterruptedException if the wait for elements was interrupted + */ + public synchronized ClusterEvent take() throws InterruptedException { + while (_eventQueue.isEmpty()) { + wait(); + } + ClusterEvent queuedEvent = _eventQueue.poll(); + if (queuedEvent != null) { + LOG.debug("Taking event " + queuedEvent.getEventType()); + LOG.debug("Event queue size: " + _eventQueue.size()); + return _eventMap.remove(queuedEvent.getEventType()); + } + return null; + } + + /** + * Get at the head of the queue without removing it + * @return ClusterEvent at the front of the queue, or null if none available + */ + public synchronized ClusterEvent peek() { + ClusterEvent queuedEvent = _eventQueue.peek(); + if (queuedEvent != null) { + return _eventMap.get(queuedEvent.getEventType()); + } + return queuedEvent; + } + + /** + * Get the queue size + * @return integer size of the queue + */ + public int size() { + return _eventQueue.size(); + } + + /** + * Check if the queue is empty + * @return true if events are not present, false otherwise + */ + public boolean isEmpty() { + return _eventQueue.isEmpty(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java new file mode 100644 index 0000000..6001edc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/ClusterEventProcessor.java @@ -0,0 +1,57 @@ +package org.apache.helix.common; + +import org.I0Itec.zkclient.exception.ZkInterruptedException; +import org.apache.helix.controller.stages.ClusterEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A generic extended single-thread class to handle ClusterEvent (multiple-producer/single consumer + * style). + */ +public abstract class ClusterEventProcessor extends Thread { + private static final Logger logger = LoggerFactory.getLogger(ClusterEventProcessor.class); + + protected final ClusterEventBlockingQueue _eventQueue; + protected final String _clusterName; + protected final String _processorName; + + public ClusterEventProcessor(String clusterName) { + this(clusterName, "Helix-ClusterEventProcessor"); + } + + public ClusterEventProcessor(String clusterName, String processorName) { + super(processorName + "-" + clusterName); + _processorName = processorName; + _eventQueue = new ClusterEventBlockingQueue(); + _clusterName = clusterName; + } + + @Override + public void run() { + logger.info("START " + _processorName + " thread for cluster " + _clusterName); + while (!isInterrupted()) { + try { + ClusterEvent event = _eventQueue.take(); + handleEvent(event); + } catch (InterruptedException e) { + logger.warn(_processorName + " thread interrupted", e); + interrupt(); + } catch (ZkInterruptedException e) { + logger.warn(_processorName + " thread caught a ZK connection interrupt", e); + interrupt(); + } catch (ThreadDeath death) { + throw death; + } catch (Throwable t) { + logger.error(_processorName + " thread failed while running the controller pipeline", t); + } + } + logger.info("END " + _processorName + " thread"); + } + + protected abstract void handleEvent(ClusterEvent event); + + public void queueEvent(ClusterEvent event) { + _eventQueue.put(event); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index c182ada..6d1af7c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -53,7 +53,7 @@ import org.apache.helix.controller.stages.AttributeName; import org.apache.helix.controller.stages.BestPossibleStateCalcStage; import org.apache.helix.controller.stages.ClusterDataCache; import org.apache.helix.controller.stages.ClusterEvent; -import org.apache.helix.controller.stages.ClusterEventBlockingQueue; +import org.apache.helix.common.ClusterEventBlockingQueue; import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.controller.stages.CompatibilityCheckStage; import org.apache.helix.controller.stages.CurrentStateComputationStage; @@ -581,7 +581,6 @@ public class GenericHelixController implements IdealStateChangeListener, .info("END: GenericClusterController.onClusterConfigChange() for cluster " + _clusterName); } - private void notifyCaches(NotificationContext context, ChangeType changeType) { if (context == null || context.getType() != Type.CALLBACK) { _cache.requireFullRefresh(); @@ -759,13 +758,15 @@ public class GenericHelixController implements IdealStateChangeListener, } } + + // TODO: refactor this to use common/ClusterEventProcessor. private class ClusterEventProcessor extends Thread { private final ClusterDataCache _cache; private final ClusterEventBlockingQueue _eventBlockingQueue; public ClusterEventProcessor(ClusterDataCache cache, ClusterEventBlockingQueue eventBlockingQueue) { - super("GerenricHelixController-event_process"); + super("GenericHelixController-event_process"); _cache = cache; _eventBlockingQueue = eventBlockingQueue; } http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java deleted file mode 100644 index 521c43e..0000000 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventBlockingQueue.java +++ /dev/null @@ -1,124 +0,0 @@ -package org.apache.helix.controller.stages; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Map; -import java.util.Queue; -import java.util.concurrent.BlockingQueue; - -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; - -/** - * A blocking queue of ClusterEvent objects to be used by the controller pipeline. This prevents - * multiple events of the same type from flooding the controller and preventing progress from being - * made. This queue has no capacity. This class is meant to be a limited implementation of the - * {@link BlockingQueue} interface. - */ -public class ClusterEventBlockingQueue { - private static final Logger LOG = LoggerFactory.getLogger(ClusterEventBlockingQueue.class); - private final Map<ClusterEventType, ClusterEvent> _eventMap; - private final Queue<ClusterEvent> _eventQueue; - - /** - * Instantiate the queue - */ - public ClusterEventBlockingQueue() { - _eventMap = Maps.newHashMap(); - _eventQueue = Lists.newLinkedList(); - } - - /** - * Remove all events from the queue - */ - public synchronized void clear() { - _eventMap.clear(); - _eventQueue.clear(); - } - - /** - * Add a single event to the queue, overwriting events with the same name - * @param event ClusterEvent event to add - */ - public synchronized void put(ClusterEvent event) { - if (!_eventMap.containsKey(event.getEventType())) { - // only insert if there isn't a same-named event already present - boolean result = _eventQueue.offer(event); - if (!result) { - return; - } - } - // always overwrite in case this is a FINALIZE - _eventMap.put(event.getEventType(), event); - LOG.debug("Putting event " + event.getEventType()); - LOG.debug("Event queue size: " + _eventQueue.size()); - notify(); - } - - /** - * Remove an element from the front of the queue, blocking if none is available. This method - * will return the most recent event seen with the oldest enqueued event name. - * @return ClusterEvent at the front of the queue - * @throws InterruptedException if the wait for elements was interrupted - */ - public synchronized ClusterEvent take() throws InterruptedException { - while (_eventQueue.isEmpty()) { - wait(); - } - ClusterEvent queuedEvent = _eventQueue.poll(); - if (queuedEvent != null) { - LOG.debug("Taking event " + queuedEvent.getEventType()); - LOG.debug("Event queue size: " + _eventQueue.size()); - return _eventMap.remove(queuedEvent.getEventType()); - } - return null; - } - - /** - * Get at the head of the queue without removing it - * @return ClusterEvent at the front of the queue, or null if none available - */ - public synchronized ClusterEvent peek() { - ClusterEvent queuedEvent = _eventQueue.peek(); - if (queuedEvent != null) { - return _eventMap.get(queuedEvent.getEventType()); - } - return queuedEvent; - } - - /** - * Get the queue size - * @return integer size of the queue - */ - public int size() { - return _eventQueue.size(); - } - - /** - * Check if the queue is empty - * @return true if events are not present, false otherwise - */ - public boolean isEmpty() { - return _eventQueue.isEmpty(); - } -} http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java new file mode 100644 index 0000000..2da0f97 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -0,0 +1,161 @@ +package org.apache.helix.spectator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache the cluster data that are needed by RoutingTableProvider. + */ +public class RoutingDataCache { + private static final Logger LOG = LoggerFactory.getLogger(RoutingDataCache.class.getName()); + + private Map<String, LiveInstance> _liveInstanceMap; + private Map<String, InstanceConfig> _instanceConfigMap; + private Map<String, ExternalView> _externalViewMap; + String _clusterName; + + private Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap; + + public RoutingDataCache(String clusterName) { + _propertyDataChangedMap = new ConcurrentHashMap<>(); + for (HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) { + _propertyDataChangedMap.put(type, Boolean.valueOf(true)); + } + _clusterName = clusterName; + } + + /** + * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way + * + * @param accessor + * + * @return + */ + public synchronized void refresh(HelixDataAccessor accessor) { + LOG.info("START: ClusterDataCache.refresh()"); + long startTime = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) { + long start = System.currentTimeMillis(); + _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false)); + _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews()); + if (LOG.isDebugEnabled()) { + LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + ( + System.currentTimeMillis() - start) + " ms"); + } + } + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) { + _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); + _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); + LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet()); + } + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) { + _propertyDataChangedMap + .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); + _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet()); + } + + long endTime = System.currentTimeMillis(); + LOG.info( + "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime + - startTime) + " ms"); + + if (LOG.isDebugEnabled()) { + LOG.debug("LiveInstances: " + _liveInstanceMap.keySet()); + for (LiveInstance instance : _liveInstanceMap.values()) { + LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); + } + LOG.debug("ExternalViews: " + _externalViewMap.keySet()); + LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); + } + } + + /** + * Retrieves the ExternalView for all resources + * + * @return + */ + public Map<String, ExternalView> getExternalViews() { + return Collections.unmodifiableMap(_externalViewMap); + } + + /** + * Returns the LiveInstances for each of the instances that are curretnly up and running + * + * @return + */ + public Map<String, LiveInstance> getLiveInstances() { + return Collections.unmodifiableMap(_liveInstanceMap); + } + + /** + * Returns the instance config map + * + * @return + */ + public Map<String, InstanceConfig> getInstanceConfigMap() { + return Collections.unmodifiableMap(_instanceConfigMap); + } + + /** + * Notify the cache that some part of the cluster data has been changed. + */ + public void notifyDataChange(HelixConstants.ChangeType changeType, String pathChanged) { + _propertyDataChangedMap.put(changeType, Boolean.valueOf(true)); + } + + /** + * Indicate that a full read should be done on the next refresh + */ + public synchronized void requireFullRefresh() { + for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) { + _propertyDataChangedMap.put(type, Boolean.valueOf(true)); + } + } + + /** + * toString method to print the data cache state + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); + sb.append("externalViewMap:" + _externalViewMap).append("\n"); + sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); + + return sb.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java new file mode 100644 index 0000000..564a218 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTable.java @@ -0,0 +1,437 @@ +package org.apache.helix.spectator; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.TreeSet; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * A class to consume ExternalViews of a cluster and provide {resource, partition, state} to + * {instances} map function. + */ +class RoutingTable { + private static final Logger logger = LoggerFactory.getLogger(RoutingTable.class); + + // mapping a resourceName to the ResourceInfo + private final Map<String, ResourceInfo> _resourceInfoMap; + // mapping a resource group name to a resourceGroupInfo + private final Map<String, ResourceGroupInfo> _resourceGroupInfoMap; + private final Collection<LiveInstance> _liveInstances; + private final Collection<InstanceConfig> _instanceConfigs; + + public RoutingTable() { + this(Collections.<ExternalView>emptyList(), Collections.<InstanceConfig>emptyList(), + Collections.<LiveInstance>emptyList()); + } + + public RoutingTable(Collection<ExternalView> externalViews, Collection<InstanceConfig> instanceConfigs, + Collection<LiveInstance> liveInstances) { + _resourceInfoMap = new HashMap<>(); + _resourceGroupInfoMap = new HashMap<>(); + _liveInstances = liveInstances; + _instanceConfigs = instanceConfigs; + refresh(externalViews, instanceConfigs); + } + + private void addEntry(String resourceName, String partitionName, String state, + InstanceConfig config) { + if (!_resourceInfoMap.containsKey(resourceName)) { + _resourceInfoMap.put(resourceName, new ResourceInfo()); + } + ResourceInfo resourceInfo = _resourceInfoMap.get(resourceName); + resourceInfo.addEntry(partitionName, state, config); + } + + /** + * add an entry with a resource with resourceGrouping enabled. + */ + private void addEntry(String resourceName, String resourceGroupName, String resourceTag, + String partitionName, String state, InstanceConfig config) { + addEntry(resourceName, partitionName, state, config); + + if (!_resourceGroupInfoMap.containsKey(resourceGroupName)) { + _resourceGroupInfoMap.put(resourceGroupName, new ResourceGroupInfo()); + } + + ResourceGroupInfo resourceGroupInfo = _resourceGroupInfoMap.get(resourceGroupName); + resourceGroupInfo.addEntry(resourceTag, partitionName, state, config); + } + + ResourceInfo get(String resourceName) { + return _resourceInfoMap.get(resourceName); + } + + ResourceGroupInfo getResourceGroup(String resourceGroupName) { + return _resourceGroupInfoMap.get(resourceGroupName); + } + + /** + * returns all instances for {resource} that are in a specific {state}. + * @param resourceName + * @param state + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) { + Set<InstanceConfig> instanceSet = null; + ResourceInfo resourceInfo = get(resourceName); + if (resourceInfo != null) { + instanceSet = resourceInfo.getInstances(state); + } + if (instanceSet == null) { + instanceSet = Collections.emptySet(); + } + return instanceSet; + } + + /** + * returns all instances for all resources in {resource group} that are in a specific {state} + * + * @param resourceGroupName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) { + Set<InstanceConfig> instanceSet = null; + ResourceGroupInfo resourceGroupInfo = getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + instanceSet = resourceGroupInfo.getInstances(state); + } + if (instanceSet == null) { + instanceSet = Collections.emptySet(); + } + return instanceSet; + } + + /** + * returns all instances for resources contains any given tags in {resource group} that are in a + * specific {state} + * + * @param resourceGroupName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state, + List<String> resourceTags) { + Set<InstanceConfig> instanceSet = null; + ResourceGroupInfo resourceGroupInfo = getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + instanceSet = new HashSet<>(); + for (String tag : resourceTags) { + Set<InstanceConfig> instances = resourceGroupInfo.getInstances(state, tag); + if (instances != null) { + instanceSet.addAll(resourceGroupInfo.getInstances(state, tag)); + } + } + } + if (instanceSet == null) { + return Collections.emptySet(); + } + return instanceSet; + } + + /** + * returns the instances for {resource,partition} pair that are in a specific + * {state} + * @param resourceName + * @param partitionName + * @param state + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, + String state) { + List<InstanceConfig> instanceList = null; + ResourceInfo resourceInfo = get(resourceName); + if (resourceInfo != null) { + PartitionInfo keyInfo = resourceInfo.get(partitionName); + if (keyInfo != null) { + instanceList = keyInfo.get(state); + } + } + if (instanceList == null) { + instanceList = Collections.emptyList(); + } + return instanceList; + } + + /** + * returns the instances for {resource group,partition} pair in all resources belongs to the given + * resource group that are in a specific {state}. + * + * The return results aggregate all partition states from all the resources in the given resource + * group. + * + * @param resourceGroupName + * @param partitionName + * @param state + * + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, + String partitionName, String state) { + List<InstanceConfig> instanceList = null; + ResourceGroupInfo resourceGroupInfo = getResourceGroup(resourceGroupName); + if (resourceGroupInfo != null) { + PartitionInfo keyInfo = resourceGroupInfo.get(partitionName); + if (keyInfo != null) { + instanceList = keyInfo.get(state); + } + } + if (instanceList == null) { + instanceList = Collections.emptyList(); + } + return instanceList; + } + + /** + * Return all liveInstances in the cluster now. + * @return + */ + protected Collection<LiveInstance> getLiveInstances() { + return _liveInstances; + } + + /** + * Return all instance's config in this cluster. + * @return + */ + protected Collection<InstanceConfig> getInstanceConfigs() { + return _instanceConfigs; + } + + /** + * returns the instances for {resource group,partition} pair contains any of the given tags + * that are in a specific {state}. + * + * Find all resources belongs to the given resource group that have any of the given resource tags + * and return the aggregated partition states from all these resources. + * + * @param resourceGroupName + * @param partitionName + * @param state + * @param resourceTags + * + * @return empty list if there is no instance in a given state + */ + public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName, + String state, List<String> resourceTags) { + ResourceGroupInfo resourceGroupInfo = getResourceGroup(resourceGroupName); + List<InstanceConfig> instanceList = null; + if (resourceGroupInfo != null) { + instanceList = new ArrayList<>(); + for (String tag : resourceTags) { + RoutingTable.PartitionInfo keyInfo = resourceGroupInfo.get(partitionName, tag); + if (keyInfo != null && keyInfo.containsState(state)) { + instanceList.addAll(keyInfo.get(state)); + } + } + } + if (instanceList == null) { + return Collections.emptyList(); + } + + return instanceList; + } + + private void refresh(Collection<ExternalView> externalViewList, + Collection<InstanceConfig> instanceConfigList) { + Map<String, InstanceConfig> instanceConfigMap = new HashMap<>(); + for (InstanceConfig config : instanceConfigList) { + instanceConfigMap.put(config.getId(), config); + } + if (externalViewList != null) { + for (ExternalView extView : externalViewList) { + String resourceName = extView.getId(); + for (String partitionName : extView.getPartitionSet()) { + Map<String, String> stateMap = extView.getStateMap(partitionName); + for (String instanceName : stateMap.keySet()) { + String currentState = stateMap.get(instanceName); + if (instanceConfigMap.containsKey(instanceName)) { + InstanceConfig instanceConfig = instanceConfigMap.get(instanceName); + if (extView.isGroupRoutingEnabled()) { + addEntry(resourceName, extView.getResourceGroupName(), + extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig); + } else { + addEntry(resourceName, partitionName, currentState, instanceConfig); + } + } else { + logger.error("Invalid instance name. " + instanceName + + " .Not found in /cluster/configs/. instanceName: "); + } + } + } + } + } + } + + /** + * Class to store instances, partitions and their states for each resource. + */ + class ResourceInfo { + // store PartitionInfo for each partition + Map<String, PartitionInfo> partitionInfoMap; + // stores the Set of Instances in a given state + Map<String, Set<InstanceConfig>> stateInfoMap; + + public ResourceInfo() { + partitionInfoMap = new HashMap<>(); + stateInfoMap = new HashMap<>(); + } + + public void addEntry(String stateUnitKey, String state, InstanceConfig config) { + if (!stateInfoMap.containsKey(state)) { + stateInfoMap.put(state, new TreeSet<>(INSTANCE_CONFIG_COMPARATOR)); + } + Set<InstanceConfig> set = stateInfoMap.get(state); + set.add(config); + + if (!partitionInfoMap.containsKey(stateUnitKey)) { + partitionInfoMap.put(stateUnitKey, new PartitionInfo()); + } + PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey); + stateUnitKeyInfo.addEntry(state, config); + } + + public Set<InstanceConfig> getInstances(String state) { + return stateInfoMap.get(state); + } + + PartitionInfo get(String stateUnitKey) { + return partitionInfoMap.get(stateUnitKey); + } + } + + /** + * Class to store instances, partitions and their states for each resource group. + */ + class ResourceGroupInfo { + // aggregated partitions and instances info for all resources in the resource group. + ResourceInfo aggregatedResourceInfo; + + // <ResourceTag, ResourceInfo> maps resource tag to the resource with the tag + // in this resource group. + // Each ResourceInfo saves only partitions and instances for that resource. + Map<String, ResourceInfo> tagToResourceMap; + + public ResourceGroupInfo() { + aggregatedResourceInfo = new ResourceInfo(); + tagToResourceMap = new HashMap<>(); + } + + public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) { + // add the new entry to the aggregated resource info + aggregatedResourceInfo.addEntry(stateUnitKey, state, config); + + // add the entry to the resourceInfo with given tag + if (!tagToResourceMap.containsKey(resourceTag)) { + tagToResourceMap.put(resourceTag, new ResourceInfo()); + } + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + resourceInfo.addEntry(stateUnitKey, state, config); + } + + public Set<InstanceConfig> getInstances(String state) { + return aggregatedResourceInfo.getInstances(state); + } + + public Set<InstanceConfig> getInstances(String state, String resourceTag) { + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + if (resourceInfo != null) { + return resourceInfo.getInstances(state); + } + + return null; + } + + PartitionInfo get(String stateUnitKey) { + return aggregatedResourceInfo.get(stateUnitKey); + } + + PartitionInfo get(String stateUnitKey, String resourceTag) { + ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); + if (resourceInfo == null) { + return null; + } + + return resourceInfo.get(stateUnitKey); + } + } + + class PartitionInfo { + Map<String, List<InstanceConfig>> stateInfoMap; + + public PartitionInfo() { + stateInfoMap = new HashMap<>(); + } + + public void addEntry(String state, InstanceConfig config) { + if (!stateInfoMap.containsKey(state)) { + stateInfoMap.put(state, new ArrayList<InstanceConfig>()); + } + List<InstanceConfig> list = stateInfoMap.get(state); + list.add(config); + } + + List<InstanceConfig> get(String state) { + return stateInfoMap.get(state); + } + + boolean containsState(String state) { + return stateInfoMap.containsKey(state); + } + } + + private static Comparator<InstanceConfig> INSTANCE_CONFIG_COMPARATOR = + new Comparator<InstanceConfig>() { + @Override + public int compare(InstanceConfig o1, InstanceConfig o2) { + if (o1 == o2) { + return 0; + } + if (o1 == null) { + return -1; + } + if (o2 == null) { + return 1; + } + + int compareTo = o1.getHostName().compareTo(o2.getHostName()); + if (compareTo == 0) { + return o1.getPort().compareTo(o2.getPort()); + } + + return compareTo; + } + }; +} http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 79a8ed0..a89636b 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -19,34 +19,60 @@ package org.apache.helix.spectator; * under the License. */ -import java.util.ArrayList; -import java.util.Collections; -import java.util.Comparator; -import java.util.HashMap; -import java.util.HashSet; +import java.util.Collection; import java.util.List; -import java.util.Map; import java.util.Set; -import java.util.TreeSet; import java.util.concurrent.atomic.AtomicReference; -import org.apache.helix.ConfigChangeListener; -import org.apache.helix.ExternalViewChangeListener; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixException; +import org.apache.helix.HelixManager; +import org.apache.helix.api.listeners.ConfigChangeListener; +import org.apache.helix.api.listeners.InstanceConfigChangeListener; +import org.apache.helix.api.listeners.ExternalViewChangeListener; import org.apache.helix.HelixDataAccessor; import org.apache.helix.NotificationContext; -import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyKey; +import org.apache.helix.api.listeners.LiveInstanceChangeListener; +import org.apache.helix.api.listeners.PreFetch; +import org.apache.helix.common.ClusterEventProcessor; +import org.apache.helix.controller.stages.AttributeName; +import org.apache.helix.controller.stages.ClusterEvent; +import org.apache.helix.controller.stages.ClusterEventType; import org.apache.helix.model.ExternalView; import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RoutingTableProvider implements ExternalViewChangeListener, ConfigChangeListener { +public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, + ConfigChangeListener, LiveInstanceChangeListener { private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class); private final AtomicReference<RoutingTable> _routingTableRef; + private final HelixManager _helixManager; + private final RouterUpdater _routerUpdater; public RoutingTableProvider() { - _routingTableRef = new AtomicReference<RoutingTableProvider.RoutingTable>(new RoutingTable()); + this(null); + } + public RoutingTableProvider(HelixManager helixManager) throws HelixException { + _routingTableRef = new AtomicReference<>(new RoutingTable()); + _helixManager = helixManager; + String clusterName = null; + if (_helixManager != null) { + clusterName = _helixManager.getClusterName(); + try { + _helixManager.addExternalViewChangeListener(this); + _helixManager.addInstanceConfigChangeListener(this); + _helixManager.addLiveInstanceChangeListener(this); + } catch (Exception e) { + logger.error("Failed to attach listeners to HelixManager!"); + throw new HelixException("Failed to attach listeners to HelixManager!", e); + } + } + _routerUpdater = new RouterUpdater(clusterName); + _routerUpdater.start(); } /** @@ -75,19 +101,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC * @return empty list if there is no instance in a given state */ public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, String state) { - List<InstanceConfig> instanceList = null; - RoutingTable _routingTable = _routingTableRef.get(); - ResourceInfo resourceInfo = _routingTable.get(resourceName); - if (resourceInfo != null) { - PartitionInfo keyInfo = resourceInfo.get(partitionName); - if (keyInfo != null) { - instanceList = keyInfo.get(state); - } - } - if (instanceList == null) { - instanceList = Collections.emptyList(); - } - return instanceList; + return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state); } /** @@ -105,19 +119,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC */ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state) { - List<InstanceConfig> instanceList = null; - RoutingTable _routingTable = _routingTableRef.get(); - ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); - if (resourceGroupInfo != null) { - PartitionInfo keyInfo = resourceGroupInfo.get(partitionName); - if (keyInfo != null) { - instanceList = keyInfo.get(state); - } - } - if (instanceList == null) { - instanceList = Collections.emptyList(); - } - return instanceList; + return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName, state); } /** @@ -134,25 +136,10 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC * * @return empty list if there is no instance in a given state */ - public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName, - String state, List<String> resourceTags) { - RoutingTable _routingTable = _routingTableRef.get(); - ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); - List<InstanceConfig> instanceList = null; - if (resourceGroupInfo != null) { - instanceList = new ArrayList<InstanceConfig>(); - for (String tag : resourceTags) { - PartitionInfo keyInfo = resourceGroupInfo.get(partitionName, tag); - if (keyInfo != null && keyInfo.containsState(state)) { - instanceList.addAll(keyInfo.get(state)); - } - } - } - if (instanceList == null) { - return Collections.emptyList(); - } - - return instanceList; + public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, + String partitionName, String state, List<String> resourceTags) { + return _routingTableRef.get() + .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags); } /** @@ -175,16 +162,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC * @return empty list if there is no instance in a given state */ public Set<InstanceConfig> getInstancesForResource(String resourceName, String state) { - Set<InstanceConfig> instanceSet = null; - RoutingTable routingTable = _routingTableRef.get(); - ResourceInfo resourceInfo = routingTable.get(resourceName); - if (resourceInfo != null) { - instanceSet = resourceInfo.getInstances(state); - } - if (instanceSet == null) { - instanceSet = Collections.emptySet(); - } - return instanceSet; + return _routingTableRef.get().getInstancesForResource(resourceName, state); } /** @@ -196,16 +174,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC * @return empty list if there is no instance in a given state */ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) { - Set<InstanceConfig> instanceSet = null; - RoutingTable _routingTable = _routingTableRef.get(); - ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); - if (resourceGroupInfo != null) { - instanceSet = resourceGroupInfo.getInstances(state); - } - if (instanceSet == null) { - instanceSet = Collections.emptySet(); - } - return instanceSet; + return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state); } /** @@ -219,274 +188,122 @@ public class RoutingTableProvider implements ExternalViewChangeListener, ConfigC */ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state, List<String> resourceTags) { - Set<InstanceConfig> instanceSet = null; - RoutingTable _routingTable = _routingTableRef.get(); - ResourceGroupInfo resourceGroupInfo = _routingTable.getResourceGroup(resourceGroupName); - if (resourceGroupInfo != null) { - instanceSet = new HashSet<InstanceConfig>(); - for (String tag : resourceTags) { - Set<InstanceConfig> instances = resourceGroupInfo.getInstances(state, tag); - if (instances != null) { - instanceSet.addAll(resourceGroupInfo.getInstances(state, tag)); - } - } - } - if (instanceSet == null) { - return Collections.emptySet(); - } - return instanceSet; + return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, state, resourceTags); + } + + /** + * Return all liveInstances in the cluster now. + * @return + */ + public Collection<LiveInstance> getLiveInstances() { + return _routingTableRef.get().getLiveInstances(); + } + + /** + * Return all instance's config in this cluster. + * @return + */ + public Collection<InstanceConfig> getInstanceConfigs() { + return _routingTableRef.get().getInstanceConfigs(); } @Override + @PreFetch(enabled = false) public void onExternalViewChange(List<ExternalView> externalViewList, NotificationContext changeContext) { - // session has expired clean up the routing table - if (changeContext.getType() == NotificationContext.Type.FINALIZE) { - logger.info("Resetting the routing table. "); - RoutingTable newRoutingTable = new RoutingTable(); - _routingTableRef.set(newRoutingTable); - return; + // Refresh with full list of external view. + // keep this here for back-compatibility + if (externalViewList != null && externalViewList.size() > 0) { + refresh(externalViewList, changeContext); + } else { + _routerUpdater.queueEvent(changeContext, ClusterEventType.ExternalViewChange, + HelixConstants.ChangeType.EXTERNAL_VIEW); } - refresh(externalViewList, changeContext); } @Override - public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) { - // session has expired clean up the routing table - if (changeContext.getType() == NotificationContext.Type.FINALIZE) { - logger.info("Resetting the routing table. "); - RoutingTable newRoutingTable = new RoutingTable(); - _routingTableRef.set(newRoutingTable); - return; - } + @PreFetch(enabled = false) + public void onInstanceConfigChange(List<InstanceConfig> configs, + NotificationContext changeContext) { + _routerUpdater.queueEvent(changeContext, ClusterEventType.InstanceConfigChange, + HelixConstants.ChangeType.INSTANCE_CONFIG); + } - HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); - List<ExternalView> externalViewList = accessor.getChildValues(keyBuilder.externalViews()); - refresh(externalViewList, changeContext); + @Override + @PreFetch(enabled = false) + public void onConfigChange(List<InstanceConfig> configs, + NotificationContext changeContext) { + onInstanceConfigChange(configs, changeContext); } - private void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) { - HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); - Builder keyBuilder = accessor.keyBuilder(); + @Override + @PreFetch(enabled = false) + public void onLiveInstanceChange(List<LiveInstance> liveInstances, + NotificationContext changeContext) { + _routerUpdater.queueEvent(changeContext, ClusterEventType.LiveInstanceChange, + HelixConstants.ChangeType.LIVE_INSTANCE); + } - List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs()); - Map<String, InstanceConfig> instanceConfigMap = new HashMap<String, InstanceConfig>(); - for (InstanceConfig config : configList) { - instanceConfigMap.put(config.getId(), config); - } + private void reset() { + logger.info("Resetting the routing table."); RoutingTable newRoutingTable = new RoutingTable(); - if (externalViewList != null) { - for (ExternalView extView : externalViewList) { - String resourceName = extView.getId(); - for (String partitionName : extView.getPartitionSet()) { - Map<String, String> stateMap = extView.getStateMap(partitionName); - for (String instanceName : stateMap.keySet()) { - String currentState = stateMap.get(instanceName); - if (instanceConfigMap.containsKey(instanceName)) { - InstanceConfig instanceConfig = instanceConfigMap.get(instanceName); - if (extView.isGroupRoutingEnabled()) { - newRoutingTable.addEntry(resourceName, extView.getResourceGroupName(), - extView.getInstanceGroupTag(), partitionName, currentState, instanceConfig); - } else { - newRoutingTable.addEntry(resourceName, partitionName, currentState, instanceConfig); - } - } else { - logger.error("Invalid instance name." + instanceName - + " .Not found in /cluster/configs/. instanceName: "); - } - } - } - } - } _routingTableRef.set(newRoutingTable); } - class RoutingTable { - // mapping a resourceName to the ResourceInfo - private final Map<String, ResourceInfo> resourceInfoMap; - - // mapping a resource group name to a resourceGroupInfo - private final Map<String, ResourceGroupInfo> resourceGroupInfoMap; - - public RoutingTable() { - resourceInfoMap = new HashMap<String, ResourceInfo>(); - resourceGroupInfoMap = new HashMap<String, ResourceGroupInfo>(); - } - - public void addEntry(String resourceName, String partitionName, String state, - InstanceConfig config) { - if (!resourceInfoMap.containsKey(resourceName)) { - resourceInfoMap.put(resourceName, new ResourceInfo()); - } - ResourceInfo resourceInfo = resourceInfoMap.get(resourceName); - resourceInfo.addEntry(partitionName, state, config); - } - - /** - * add an entry with a resource with resourceGrouping enabled. - */ - public void addEntry(String resourceName, String resourceGroupName, String resourceTag, - String partitionName, String state, InstanceConfig config) { - addEntry(resourceName, partitionName, state, config); - - if (!resourceGroupInfoMap.containsKey(resourceGroupName)) { - resourceGroupInfoMap.put(resourceGroupName, new ResourceGroupInfo()); - } - - ResourceGroupInfo resourceGroupInfo = resourceGroupInfoMap.get(resourceGroupName); - resourceGroupInfo.addEntry(resourceTag, partitionName, state, config); - } - - ResourceInfo get(String resourceName) { - return resourceInfoMap.get(resourceName); - } + public void refresh(List<ExternalView> externalViewList, NotificationContext changeContext) { + HelixDataAccessor accessor = changeContext.getManager().getHelixDataAccessor(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - ResourceGroupInfo getResourceGroup(String resourceGroupName) { - return resourceGroupInfoMap.get(resourceGroupName); - } + List<InstanceConfig> configList = accessor.getChildValues(keyBuilder.instanceConfigs()); + List<LiveInstance> liveInstances = accessor.getChildValues(keyBuilder.liveInstances()); + refresh(externalViewList, configList, liveInstances); } - private static Comparator<InstanceConfig> INSTANCE_CONFIG_COMPARATOR = - new Comparator<InstanceConfig>() { - @Override - public int compare(InstanceConfig o1, InstanceConfig o2) { - if (o1 == o2) { - return 0; - } - if (o1 == null) { - return -1; - } - if (o2 == null) { - return 1; - } - - int compareTo = o1.getHostName().compareTo(o2.getHostName()); - if (compareTo == 0) { - return o1.getPort().compareTo(o2.getPort()); - } else { - return compareTo; - } - - } - }; - - /** - * Class to store instances, partitions and their states for each resource. - */ - class ResourceInfo { - // store PartitionInfo for each partition - Map<String, PartitionInfo> partitionInfoMap; - // stores the Set of Instances in a given state - Map<String, Set<InstanceConfig>> stateInfoMap; - - public ResourceInfo() { - partitionInfoMap = new HashMap<String, RoutingTableProvider.PartitionInfo>(); - stateInfoMap = new HashMap<String, Set<InstanceConfig>>(); - } - - public void addEntry(String stateUnitKey, String state, InstanceConfig config) { - // add - if (!stateInfoMap.containsKey(state)) { - stateInfoMap.put(state, new TreeSet<InstanceConfig>(INSTANCE_CONFIG_COMPARATOR)); - } - Set<InstanceConfig> set = stateInfoMap.get(state); - set.add(config); - - if (!partitionInfoMap.containsKey(stateUnitKey)) { - partitionInfoMap.put(stateUnitKey, new PartitionInfo()); - } - PartitionInfo stateUnitKeyInfo = partitionInfoMap.get(stateUnitKey); - stateUnitKeyInfo.addEntry(state, config); - } - - public Set<InstanceConfig> getInstances(String state) { - Set<InstanceConfig> instanceSet = stateInfoMap.get(state); - return instanceSet; - } - - PartitionInfo get(String stateUnitKey) { - return partitionInfoMap.get(stateUnitKey); - } + public void refresh(Collection<ExternalView> externalViews, + Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { + RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances); + _routingTableRef.set(newRoutingTable); } - /** - * Class to store instances, partitions and their states for each resource group. - */ - class ResourceGroupInfo { - // aggregated partitions and instances info for all resources in the resource group. - ResourceInfo aggregatedResourceInfo; - - // <ResourceTag, ResourceInfo> maps resource tag to the resource with the tag - // in this resource group. - // Each ResourceInfo saves only partitions and instances for that resource. - Map<String, ResourceInfo> tagToResourceMap; - - public ResourceGroupInfo() { - aggregatedResourceInfo = new ResourceInfo(); - tagToResourceMap = new HashMap<String, ResourceInfo>(); - } - - public void addEntry(String resourceTag, String stateUnitKey, String state, InstanceConfig config) { - // add the new entry to the aggregated resource info - aggregatedResourceInfo.addEntry(stateUnitKey, state, config); - - // add the entry to the resourceInfo with given tag - if (!tagToResourceMap.containsKey(resourceTag)) { - tagToResourceMap.put(resourceTag, new ResourceInfo()); - } - ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); - resourceInfo.addEntry(stateUnitKey, state, config); - } - - public Set<InstanceConfig> getInstances(String state) { - return aggregatedResourceInfo.getInstances(state); - } - - public Set<InstanceConfig> getInstances(String state, String resourceTag) { - ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); - if (resourceInfo != null) { - return resourceInfo.getInstances(state); - } - - return null; - } - - PartitionInfo get(String stateUnitKey) { - return aggregatedResourceInfo.get(stateUnitKey); - } - - PartitionInfo get(String stateUnitKey, String resourceTag) { - ResourceInfo resourceInfo = tagToResourceMap.get(resourceTag); - if (resourceInfo == null) { - return null; + private class RouterUpdater extends ClusterEventProcessor { + private final RoutingDataCache _dataCache; + + public RouterUpdater(String clusterName) { + super("Helix-RouterUpdater-event_process"); + _dataCache = new RoutingDataCache(clusterName); + } + + @Override + protected void handleEvent(ClusterEvent event) { + NotificationContext changeContext = event.getAttribute(AttributeName.changeContext.name()); + // session has expired clean up the routing table + if (changeContext.getType() == NotificationContext.Type.FINALIZE) { + reset(); + } else { + // refresh routing table. + HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); + if (manager == null) { + logger.error("HelixManager is null for router update event : " + event); + throw new HelixException("HelixManager is null for router update event."); + } + _dataCache.refresh(manager.getHelixDataAccessor()); + refresh(_dataCache.getExternalViews().values(), _dataCache.getInstanceConfigMap().values(), + _dataCache.getLiveInstances().values()); } - - return resourceInfo.get(stateUnitKey); } - } - class PartitionInfo { - Map<String, List<InstanceConfig>> stateInfoMap; - - public PartitionInfo() { - stateInfoMap = new HashMap<String, List<InstanceConfig>>(); - } - - public void addEntry(String state, InstanceConfig config) { - if (!stateInfoMap.containsKey(state)) { - stateInfoMap.put(state, new ArrayList<InstanceConfig>()); + public void queueEvent(NotificationContext context, ClusterEventType eventType, + HelixConstants.ChangeType changeType) { + ClusterEvent event = new ClusterEvent(_clusterName, eventType); + if (context == null || context.getType() != NotificationContext.Type.CALLBACK) { + _dataCache.requireFullRefresh(); + } else { + _dataCache.notifyDataChange(changeType, context.getPathChanged()); } - List<InstanceConfig> list = stateInfoMap.get(state); - list.add(config); - } - - List<InstanceConfig> get(String state) { - return stateInfoMap.get(state); - } - boolean containsState(String state) { - return stateInfoMap.containsKey(state); + event.addAttribute(AttributeName.helixmanager.name(), context.getManager()); + event.addAttribute(AttributeName.changeContext.name(), context); + queueEvent(event); } } } http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java index 9fc0b33..11f3701 100644 --- a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java +++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java @@ -138,7 +138,7 @@ public class TestRoutingTable { } @Test() - public void testStateUnitGroupDeletion() { + public void testStateUnitGroupDeletion() throws InterruptedException { List<InstanceConfig> instances; RoutingTableProvider routingTable = new RoutingTableProvider(); @@ -155,6 +155,7 @@ public class TestRoutingTable { externalViewList.clear(); routingTable.onExternalViewChange(externalViewList, changeContext); + Thread.sleep(100); instances = routingTable.getInstances("TESTDB", "TESTDB_0", "MASTER"); AssertJUnit.assertNotNull(instances); AssertJUnit.assertEquals(instances.size(), 0); http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java index 802b39c..da86495 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestClusterEventBlockingQueue.java @@ -25,6 +25,7 @@ import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import org.apache.helix.common.ClusterEventBlockingQueue; import org.testng.Assert; import org.testng.annotations.Test; http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java new file mode 100644 index 0000000..aa731e5 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/Spectator/TestRoutingTableProvider.java @@ -0,0 +1,173 @@ +package org.apache.helix.integration.Spectator; + +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Date; +import java.util.HashSet; +import java.util.List; +import java.util.Set; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.mockito.internal.util.collections.Sets; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestRoutingTableProvider extends ZkIntegrationTestBase { + + static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name(); + static final String TEST_DB = "TestDB"; + static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName(); + static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + static final int PARTICIPANT_NUMBER = 3; + static final int PARTICIPANT_START_PORT = 12918; + + static final int PARTITION_NUMBER = 20; + static final int REPLICA_NUMBER = 3; + + private HelixManager _spectator; + private List<MockParticipantManager> _participants = new ArrayList<MockParticipantManager>(); + private List<String> _instances = new ArrayList<>(); + private ClusterControllerManager _controller; + private HelixClusterVerifier _clusterVerifier; + private RoutingTableProvider _routingTableProvider; + private RoutingTableProvider _routingTableProvider2; + + @BeforeClass + public void beforeClass() throws Exception { + System.out.println( + "START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + } + + // start dummy participants + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.syncStart(); + _participants.add(participant); + } + + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances, + STATE_MODEL, PARTITION_NUMBER, REPLICA_NUMBER); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // start speculator + _routingTableProvider = new RoutingTableProvider(); + _spectator = HelixManagerFactory + .getZKHelixManager(CLUSTER_NAME, "spectator", InstanceType.SPECTATOR, ZK_ADDR); + _spectator.connect(); + _spectator.addExternalViewChangeListener(_routingTableProvider); + _spectator.addLiveInstanceChangeListener(_routingTableProvider); + _spectator.addInstanceConfigChangeListener(_routingTableProvider); + + _routingTableProvider2 = new RoutingTableProvider(_spectator); + + _clusterVerifier = new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(_clusterVerifier.verify()); + } + + @AfterClass + public void afterClass() { + // stop participants + for (MockParticipantManager p : _participants) { + p.syncStop(); + } + _controller.syncStop(); + _spectator.disconnect(); + _gSetupTool.deleteCluster(CLUSTER_NAME); + } + + @Test + public void testRoutingTable() { + Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size()); + + Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size()); + Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size()); + + validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(1), _instances.get(2))); + validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(1), _instances.get(2))); + } + + @Test(dependsOnMethods = { "testRoutingTable" }) + public void testDisableInstance() throws InterruptedException { + // disable the master instance + String prevMasterInstance = _instances.get(0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, false); + Assert.assertTrue(_clusterVerifier.verify()); + + validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(1)), + Sets.newSet(_instances.get(2))); + validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(1)), + Sets.newSet(_instances.get(2))); + } + + @Test(dependsOnMethods = { "testDisableInstance" }) + public void testShutdownInstance() throws InterruptedException { + // reenable the first instance + String prevMasterInstance = _instances.get(0); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, prevMasterInstance, true); + + // shutdown second instance. + _participants.get(1).syncStop(); + + Assert.assertTrue(_clusterVerifier.verify()); + + Assert.assertEquals(_routingTableProvider.getLiveInstances().size(), _instances.size() - 1); + Assert.assertEquals(_routingTableProvider.getInstanceConfigs().size(), _instances.size()); + + Assert.assertEquals(_routingTableProvider2.getLiveInstances().size(), _instances.size() - 1); + Assert.assertEquals(_routingTableProvider2.getInstanceConfigs().size(), _instances.size()); + + validateRoutingTable(_routingTableProvider, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(2))); + validateRoutingTable(_routingTableProvider2, Sets.newSet(_instances.get(0)), + Sets.newSet(_instances.get(2))); + } + + private void validateRoutingTable(RoutingTableProvider routingTableProvider, + Set<String> masterNodes, Set<String> slaveNodes) { + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, TEST_DB); + for (String p : is.getPartitionSet()) { + Set<String> masterInstances = new HashSet<>(); + for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "MASTER")) { + masterInstances.add(config.getInstanceName()); + } + + Set<String> slaveInstances = new HashSet<>(); + for (InstanceConfig config : routingTableProvider.getInstances(TEST_DB, p, "SLAVE")) { + slaveInstances.add(config.getInstanceName()); + } + + Assert.assertEquals(masterInstances, masterNodes); + Assert.assertEquals(slaveInstances, slaveNodes); + } + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java index a24ded4..0c61b27 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestBasicSpectator.java @@ -52,6 +52,7 @@ public class TestBasicSpectator extends ZkStandAloneCMTestBase implements boolean result = ClusterStateVerifier.verifyByPolling(new ClusterStateVerifier.BestPossAndExtViewZkVerifier( ZK_ADDR, CLUSTER_NAME)); + Assert.assertTrue(result); Assert.assertTrue(_externalViewChanges.containsKey("NextDB")); http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java index e8640f4..47f8af9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestResourceGroupEndtoEnd.java @@ -33,6 +33,7 @@ import org.apache.helix.manager.zk.ZKHelixAdmin; import org.apache.helix.manager.zk.ZKHelixManager; import org.apache.helix.manager.zk.ZkClient; import org.apache.helix.mock.participant.DummyProcess; +import org.apache.helix.model.BuiltInStateModelDefinitions; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.OnlineOfflineSMD; @@ -90,9 +91,9 @@ public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase { for (String tag : instanceGroupTags) { List<String> instances = _admin.getInstancesInClusterWithTag(CLUSTER_NAME, tag); - IdealState idealState = - createIdealState(TEST_DB, tag, instances, PARTITIONS, _replica, - IdealState.RebalanceMode.CUSTOMIZED.toString()); + IdealState idealState = createIdealState(TEST_DB, tag, instances, PARTITIONS, _replica, + IdealState.RebalanceMode.CUSTOMIZED.toString(), + BuiltInStateModelDefinitions.OnlineOffline.name()); _gSetupTool.addResourceToCluster(CLUSTER_NAME, idealState.getResourceName(), idealState); } @@ -140,28 +141,6 @@ public class TestResourceGroupEndtoEnd extends ZkIntegrationTestBase { _spectator.disconnect(); } - public IdealState createIdealState(String resourceGroupName, String instanceGroupTag, - List<String> instanceNames, int numPartition, int replica, String rebalanceMode) { - IdealState is = - _gSetupTool.createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, - numPartition, replica, rebalanceMode, "OnlineOffline"); - - // setup initial partition->instance mapping. - int nodeIdx = 0; - int numNode = instanceNames.size(); - assert (numNode >= replica); - for (int i = 0; i < numPartition; i++) { - String partitionName = resourceGroupName + "_" + i; - for (int j = 0; j < replica; j++) { - is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode), - OnlineOfflineSMD.States.ONLINE.toString()); - } - nodeIdx++; - } - - return is; - } - private void addInstanceGroup(String clusterName, String instanceTag, int numInstance) { List<String> instances = new ArrayList<String>(); for (int i = 0; i < numInstance; i++) { http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java index 4e49502..cee331e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java +++ b/helix-core/src/test/java/org/apache/helix/integration/common/ZkIntegrationTestBase.java @@ -21,6 +21,7 @@ package org.apache.helix.integration.common; import java.lang.reflect.Method; import java.util.Date; +import java.util.List; import java.util.Map; import java.util.Set; import java.util.logging.Level; @@ -48,6 +49,7 @@ import org.apache.helix.model.ExternalView; import org.apache.helix.model.IdealState; import org.apache.helix.model.InstanceConfig; import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.OnlineOfflineSMD; import org.apache.helix.model.StateModelDefinition; import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.tools.ClusterSetup; @@ -217,6 +219,43 @@ public class ZkIntegrationTestBase { return idealState; } + protected IdealState createIdealState(String resourceGroupName, String instanceGroupTag, + List<String> instanceNames, int numPartition, int replica, String rebalanceMode, + String stateModelDef) { + IdealState is = _gSetupTool + .createIdealStateForResourceGroup(resourceGroupName, instanceGroupTag, numPartition, + replica, rebalanceMode, stateModelDef); + + // setup initial partition->instance mapping. + int nodeIdx = 0; + int numNode = instanceNames.size(); + assert (numNode >= replica); + for (int i = 0; i < numPartition; i++) { + String partitionName = resourceGroupName + "_" + i; + for (int j = 0; j < replica; j++) { + is.setPartitionState(partitionName, instanceNames.get((nodeIdx + j) % numNode), + OnlineOfflineSMD.States.ONLINE.toString()); + } + nodeIdx++; + } + + return is; + } + + protected void createDBInSemiAuto(ClusterSetup clusterSetup, String clusterName, String dbName, + List<String> preferenceList, String stateModelDef, int numPartition, int replica) { + clusterSetup.addResourceToCluster(clusterName, dbName, numPartition, stateModelDef, + IdealState.RebalanceMode.SEMI_AUTO.toString()); + clusterSetup.rebalanceStorageCluster(clusterName, dbName, replica); + + IdealState is = + _gSetupTool.getClusterManagementTool().getResourceIdealState(clusterName, dbName); + for (String p : is.getPartitionSet()) { + is.setPreferenceList(p, preferenceList); + } + clusterSetup.getClusterManagementTool().setResourceIdealState(clusterName, dbName, is); + } + /** * Validate there should be always minimal active replica and top state replica for each partition. * Also make sure there is always some partitions with only active replica count. http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java index 6f7f369..cc168e9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java +++ b/helix-core/src/test/java/org/apache/helix/integration/manager/MockParticipantManager.java @@ -40,9 +40,9 @@ import org.slf4j.LoggerFactory; public class MockParticipantManager extends ZKHelixManager implements Runnable, ZkTestManager { private static Logger LOG = LoggerFactory.getLogger(MockParticipantManager.class); - private CountDownLatch _startCountDown = new CountDownLatch(1); - private CountDownLatch _stopCountDown = new CountDownLatch(1); - private CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1); + protected CountDownLatch _startCountDown = new CountDownLatch(1); + protected CountDownLatch _stopCountDown = new CountDownLatch(1); + protected CountDownLatch _waitStopCompleteCountDown = new CountDownLatch(1); protected MockMSModelFactory _msModelFactory = new MockMSModelFactory(null); protected DummyLeaderStandbyStateModelFactory _lsModelFactory = http://git-wip-us.apache.org/repos/asf/helix/blob/093f7ab9/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java index e0f83e1..d7ea7a3 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java @@ -95,8 +95,10 @@ public class TestP2PMessageSemiAuto extends ZkIntegrationTestBase { _participants.add(participant); } - createDBInSemiAuto(DB_NAME_1, _instances); - createDBInSemiAuto(DB_NAME_2, _instances); + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, DB_NAME_1, _instances, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER); + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, DB_NAME_2, _instances, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER); // start controller String controllerName = CONTROLLER_PREFIX + "_0"; @@ -110,18 +112,6 @@ public class TestP2PMessageSemiAuto extends ZkIntegrationTestBase { _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); } - private void createDBInSemiAuto(String dbName, List<String> preferenceList) { - _gSetupTool.addResourceToCluster(CLUSTER_NAME, dbName, PARTITION_NUMBER, - BuiltInStateModelDefinitions.MasterSlave.name(), IdealState.RebalanceMode.SEMI_AUTO.toString()); - _gSetupTool.rebalanceStorageCluster(CLUSTER_NAME, dbName, REPLICA_NUMBER); - - IdealState is = _gSetupTool.getClusterManagementTool().getResourceIdealState(CLUSTER_NAME, dbName); - for (String p : is.getPartitionSet()) { - is.setPreferenceList(p, preferenceList); - } - _gSetupTool.getClusterManagementTool().setResourceIdealState(CLUSTER_NAME, dbName, is); - } - @Test public void testP2PStateTransitionDisabled() { // disable the master instance
