Repository: helix Updated Branches: refs/heads/master e1faf2404 -> 0e4163f18
[HELIX-698] Add periodic refresh to RoutingTableProvider There have been incidents where RoutingTableProvider was not getting a proper refresh potentially due to the lag in ZKClient CallbackHandler or connectivity issues. This addition of periodic refresh avoids cases where RoutingTableProvider is severely delayed by initiating periodic refreshes. Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e4163f1 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e4163f1 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e4163f1 Branch: refs/heads/master Commit: 0e4163f18c1274c0f77320698e9dfbf42314810d Parents: e1faf24 Author: Hunter Lee <[email protected]> Authored: Thu Apr 19 13:42:37 2018 -0700 Committer: Hunter Lee <[email protected]> Committed: Thu Apr 19 14:07:52 2018 -0700 ---------------------------------------------------------------------- .../org/apache/helix/NotificationContext.java | 1 + .../common/caches/BasicClusterDataCache.java | 2 +- .../helix/spectator/RoutingTableProvider.java | 225 ++++++++++++------- ...TestRoutingTableProviderPeriodicRefresh.java | 218 ++++++++++++++++++ 4 files changed, 359 insertions(+), 87 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/NotificationContext.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java b/helix-core/src/main/java/org/apache/helix/NotificationContext.java index dd76b60..9664f66 100644 --- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java +++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java @@ -188,6 +188,7 @@ public class NotificationContext { public enum Type { INIT, CALLBACK, + PERIODIC_REFRESH, FINALIZE } http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java index d6e324d..06fcaf6 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java @@ -225,7 +225,7 @@ public class BasicClusterDataCache { */ public void requireFullRefresh() { for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) { - _propertyDataChangedMap.put(type, Boolean.valueOf(true)); + _propertyDataChangedMap.put(type, Boolean.TRUE); } } http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java index 4076697..f72d66a 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java @@ -26,6 +26,8 @@ import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ScheduledThreadPoolExecutor; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicReference; import org.apache.helix.HelixConstants; @@ -53,25 +55,47 @@ import org.apache.helix.model.LiveInstance; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -public class RoutingTableProvider implements ExternalViewChangeListener, InstanceConfigChangeListener, - ConfigChangeListener, LiveInstanceChangeListener, CurrentStateChangeListener { +public class RoutingTableProvider + implements ExternalViewChangeListener, InstanceConfigChangeListener, ConfigChangeListener, + LiveInstanceChangeListener, CurrentStateChangeListener { private static final Logger logger = LoggerFactory.getLogger(RoutingTableProvider.class); + private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000; // 5 minutes private final AtomicReference<RoutingTable> _routingTableRef; private final HelixManager _helixManager; private final RouterUpdater _routerUpdater; private final PropertyType _sourceDataType; private final Map<RoutingTableChangeListener, ListenerContext> _routingTableChangeListenerMap; + // For periodic refresh + private long _lastRefreshTimestamp; + private boolean _isPeriodicRefreshEnabled = true; // Default is enabled + private long _periodRefreshInterval; + private ScheduledThreadPoolExecutor _periodicRefreshExecutor; + public RoutingTableProvider() { this(null); } public RoutingTableProvider(HelixManager helixManager) throws HelixException { - this(helixManager, PropertyType.EXTERNALVIEW); + this(helixManager, PropertyType.EXTERNALVIEW, true, DEFAULT_PERIODIC_REFRESH_INTERVAL); } public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType) throws HelixException { + this(helixManager, sourceDataType, true, DEFAULT_PERIODIC_REFRESH_INTERVAL); + } + + /** + * Initialize an instance of RoutingTableProvider + * + * @param helixManager + * @param sourceDataType + * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, false otherwise + * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is true + * @throws HelixException + */ + public RoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType, + boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws HelixException { _routingTableRef = new AtomicReference<>(new RoutingTable()); _helixManager = helixManager; _sourceDataType = sourceDataType; @@ -79,41 +103,43 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc String clusterName = _helixManager != null ? _helixManager.getClusterName() : null; _routerUpdater = new RouterUpdater(clusterName, _sourceDataType); _routerUpdater.start(); + if (_helixManager != null) { switch (_sourceDataType) { - case EXTERNALVIEW: - try { - _helixManager.addExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); - logger.error("Failed to attach ExternalView Listener to HelixManager!"); - throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e); - } - break; - - case TARGETEXTERNALVIEW: - // Check whether target external has been enabled or not - if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( - _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) { - shutdown(); - throw new HelixException("Target External View is not enabled!"); - } + case EXTERNALVIEW: + try { + _helixManager.addExternalViewChangeListener(this); + } catch (Exception e) { + shutdown(); + logger.error("Failed to attach ExternalView Listener to HelixManager!"); + throw new HelixException("Failed to attach ExternalView Listener to HelixManager!", e); + } + break; - try { - _helixManager.addTargetExternalViewChangeListener(this); - } catch (Exception e) { - shutdown(); - logger.error("Failed to attach TargetExternalView Listener to HelixManager!"); - throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!", e); - } - break; + case TARGETEXTERNALVIEW: + // Check whether target external has been enabled or not + if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists( + _helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(), 0)) { + shutdown(); + throw new HelixException("Target External View is not enabled!"); + } - case CURRENTSTATES: - // CurrentState change listeners will be added later in LiveInstanceChange call. - break; + try { + _helixManager.addTargetExternalViewChangeListener(this); + } catch (Exception e) { + shutdown(); + logger.error("Failed to attach TargetExternalView Listener to HelixManager!"); + throw new HelixException("Failed to attach TargetExternalView Listener to HelixManager!", + e); + } + break; - default: - throw new HelixException("Unsupported source data type: " + sourceDataType); + case CURRENTSTATES: + // CurrentState change listeners will be added later in LiveInstanceChange call. + break; + + default: + throw new HelixException(String.format("Unsupported source data type: %s", sourceDataType)); } try { @@ -128,12 +154,40 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc e); } } + + // For periodic refresh + if (isPeriodicRefreshEnabled) { + _lastRefreshTimestamp = System.currentTimeMillis(); // Initialize timestamp with current time + _periodRefreshInterval = periodRefreshInterval; + // Construct a periodic refresh context + final NotificationContext periodicRefreshContext = new NotificationContext(_helixManager); + periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH); + // Create a thread that runs at specified interval + _periodicRefreshExecutor = new ScheduledThreadPoolExecutor(1); + _periodicRefreshExecutor.scheduleAtFixedRate(new Runnable() { + @Override + public void run() { + // If enough time has elapsed since last refresh, queue a refresh event + if (_lastRefreshTimestamp + _periodRefreshInterval < System.currentTimeMillis()) { + // changeType is irrelevant for NotificationContext.Type.PERIODIC_REFRESH + _routerUpdater.queueEvent(periodicRefreshContext, ClusterEventType.PeriodicalRebalance, + null); + } + } + }, _periodRefreshInterval, _periodRefreshInterval, TimeUnit.MILLISECONDS); + } else { + _isPeriodicRefreshEnabled = false; + } } /** * Shutdown current RoutingTableProvider. Once it is shutdown, it should never be reused. */ public void shutdown() { + if (_periodicRefreshExecutor != null) { + _periodicRefreshExecutor.purge(); + _periodicRefreshExecutor.shutdown(); + } _routerUpdater.shutdown(); if (_helixManager != null) { PropertyKey.Builder keyBuilder = _helixManager.getHelixDataAccessor().keyBuilder(); @@ -147,7 +201,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc case CURRENTSTATES: NotificationContext context = new NotificationContext(_helixManager); context.setType(NotificationContext.Type.FINALIZE); - updateCurrentStatesListeners(Collections.<LiveInstance>emptyList(), context); + updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), context); break; default: break; @@ -158,7 +212,6 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc /** * Get an snapshot of current RoutingTable information. The snapshot is immutable, it reflects the * routing table information at the time this method is called. - * * @return snapshot of current routing table. */ public RoutingTableSnapshot getRoutingTableSnapshot() { @@ -167,29 +220,30 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc /** * Add RoutingTableChangeListener with user defined context - * * @param routingTableChangeListener * @param context user defined context */ - public void addRoutingTableChangeListener(final RoutingTableChangeListener routingTableChangeListener, - Object context) { + public void addRoutingTableChangeListener( + final RoutingTableChangeListener routingTableChangeListener, Object context) { _routingTableChangeListenerMap.put(routingTableChangeListener, new ListenerContext(context)); + logger.info("Attach RoutingTableProviderChangeListener {}", + routingTableChangeListener.getClass().getName()); } /** * Remove RoutingTableChangeListener - * * @param routingTableChangeListener */ public Object removeRoutingTableChangeListener( final RoutingTableChangeListener routingTableChangeListener) { + logger.info("Detach RoutingTableProviderChangeListener {}", + routingTableChangeListener.getClass().getName()); return _routingTableChangeListenerMap.remove(routingTableChangeListener); } /** * returns the instances for {resource,partition} pair that are in a specific * {state} - * * This method will be deprecated, please use the * {@link #getInstancesForResource(String, String, String)} getInstancesForResource} method. * @param resourceName @@ -198,7 +252,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc * @param state * @return empty list if there is no instance in a given state */ - public List<InstanceConfig> getInstances(String resourceName, String partitionName, String state) { + public List<InstanceConfig> getInstances(String resourceName, String partitionName, + String state) { return getInstancesForResource(resourceName, partitionName, state); } @@ -211,21 +266,19 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc * @param state * @return empty list if there is no instance in a given state */ - public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, String state) { + public List<InstanceConfig> getInstancesForResource(String resourceName, String partitionName, + String state) { return _routingTableRef.get().getInstancesForResource(resourceName, partitionName, state); } /** * returns the instances for {resource group,partition} pair in all resources belongs to the given * resource group that are in a specific {state}. - * * The return results aggregate all partition states from all the resources in the given resource * group. - * * @param resourceGroupName * @param partitionName * @param state - * * @return empty list if there is no instance in a given state */ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, @@ -237,26 +290,22 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc /** * returns the instances for {resource group,partition} pair contains any of the given tags * that are in a specific {state}. - * * Find all resources belongs to the given resource group that have any of the given resource tags * and return the aggregated partition states from all these resources. - * * @param resourceGroupName * @param partitionName * @param state * @param resourceTags - * * @return empty list if there is no instance in a given state */ public List<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String partitionName, String state, List<String> resourceTags) { - return _routingTableRef.get() - .getInstancesForResourceGroup(resourceGroupName, partitionName, state, resourceTags); + return _routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, partitionName, + state, resourceTags); } /** * returns all instances for {resource} that are in a specific {state} - * * This method will be deprecated, please use the * {@link #getInstancesForResource(String, String) getInstancesForResource} method. * @param resourceName @@ -279,10 +328,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc /** * returns all instances for all resources in {resource group} that are in a specific {state} - * * @param resourceGroupName * @param state - * * @return empty list if there is no instance in a given state */ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state) { @@ -292,10 +339,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc /** * returns all instances for resources contains any given tags in {resource group} that are in a * specific {state} - * * @param resourceGroupName * @param state - * * @return empty list if there is no instance in a given state */ public Set<InstanceConfig> getInstancesForResourceGroup(String resourceGroupName, String state, @@ -333,13 +378,15 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc NotificationContext changeContext) { HelixConstants.ChangeType changeType = changeContext.getChangeType(); if (changeType != null && !changeType.getPropertyType().equals(_sourceDataType)) { - logger.warn("onExternalViewChange called with dis-matched change types. Source data type " - + _sourceDataType + ", changed data type: " + changeType); + logger.warn( + "onExternalViewChange called with mismatched change types. Source data type {}, changed data type: {}", + _sourceDataType, changeType); return; } // Refresh with full list of external view. if (externalViewList != null && externalViewList.size() > 0) { - // keep this here for back-compatibility, application can call onExternalViewChange directly with externalview list supplied. + // keep this here for back-compatibility, application can call onExternalViewChange directly + // with externalview list supplied. refresh(externalViewList, changeContext); } else { ClusterEventType eventType; @@ -348,8 +395,9 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) { eventType = ClusterEventType.TargetExternalViewChange; } else { - logger.warn("onExternalViewChange called with dis-matched change types. Source data type " - + _sourceDataType + ", change type: " + changeType); + logger.warn( + "onExternalViewChange called with mismatched change types. Source data type {}, change type: {}", + _sourceDataType, changeType); return; } _routerUpdater.queueEvent(changeContext, eventType, changeType); @@ -366,8 +414,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc @Override @PreFetch(enabled = false) - public void onConfigChange(List<InstanceConfig> configs, - NotificationContext changeContext) { + public void onConfigChange(List<InstanceConfig> configs, NotificationContext changeContext) { onInstanceConfigChange(configs, changeContext); } @@ -386,8 +433,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc @Override @PreFetch(enabled = false) - public void onStateChange(String instanceName, - List<CurrentState> statesInfo, NotificationContext changeContext) { + public void onStateChange(String instanceName, List<CurrentState> statesInfo, + NotificationContext changeContext) { if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) { _routerUpdater.queueEvent(changeContext, ClusterEventType.CurrentStateChange, HelixConstants.ChangeType.CURRENT_STATE); @@ -410,7 +457,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc if (changeContext.getType() == NotificationContext.Type.FINALIZE) { // on finalize, should remove all current-state listeners - logger.info("remove current-state listeners. lastSeenSessions: " + _lastSeenSessions); + logger.info("remove current-state listeners. lastSeenSessions: {}", _lastSeenSessions); liveInstances = Collections.emptyList(); } @@ -433,11 +480,12 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc try { // add current-state listeners for new sessions manager.addCurrentStateChangeListener(this, instanceName, session); - logger.info(manager.getInstanceName() + " added current-state listener for instance: " - + instanceName + ", session: " + session + ", listener: " + this); + logger.info( + "{} added current-state listener for instance: {}, session: {}, listener: {}", + manager.getInstanceName(), instanceName, session, this); } catch (Exception e) { - logger.error("Fail to add current state listener for instance: " + instanceName - + " with session: " + session, e); + logger.error("Fail to add current state listener for instance: {} with session: {}", + instanceName, session, e); } } } @@ -447,8 +495,8 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc if (!curSessions.containsKey(session)) { String instanceName = lastSessions.get(session).getInstanceName(); manager.removeListener(keyBuilder.currentStates(instanceName, session), this); - logger.info("remove current-state listener for instance:" + instanceName + ", session: " - + session); + logger.info("remove current-state listener for instance: {}, session: {}", instanceName, + session); } } @@ -476,10 +524,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { long startTime = System.currentTimeMillis(); RoutingTable newRoutingTable = new RoutingTable(externalViews, instanceConfigs, liveInstances); - _routingTableRef.set(newRoutingTable); - logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != null ? _helixManager - .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms."); - notifyRoutingTableChange(); + resetRoutingTableAndNotify(startTime, newRoutingTable); } protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap, @@ -487,10 +532,20 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc long startTime = System.currentTimeMillis(); RoutingTable newRoutingTable = new RoutingTable(currentStateMap, instanceConfigs, liveInstances); + resetRoutingTableAndNotify(startTime, newRoutingTable); + } + + private void resetRoutingTableAndNotify(long startTime, RoutingTable newRoutingTable) { _routingTableRef.set(newRoutingTable); - logger.info("Refresh the RoutingTable for cluster " + (_helixManager != null ? _helixManager - .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - startTime) + "ms."); + logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.", + (_helixManager != null ? _helixManager.getClusterName() : null), + (System.currentTimeMillis() - startTime)); notifyRoutingTableChange(); + + // Update timestamp for last refresh + if (_isPeriodicRefreshEnabled) { + _lastRefreshTimestamp = System.currentTimeMillis(); + } } private void notifyRoutingTableChange() { @@ -505,7 +560,7 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc private final RoutingDataCache _dataCache; public RouterUpdater(String clusterName, PropertyType sourceDataType) { - super("Helix-RouterUpdater"); + super("Helix-RouterUpdater-event_process"); _dataCache = new RoutingDataCache(clusterName, sourceDataType); } @@ -519,30 +574,26 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc // refresh routing table. HelixManager manager = event.getAttribute(AttributeName.helixmanager.name()); if (manager == null) { - logger.error("HelixManager is null for router update event : " + event); + logger.error(String.format("HelixManager is null for router update event: %s", event)); throw new HelixException("HelixManager is null for router update event."); } _dataCache.refresh(manager.getHelixDataAccessor()); - switch (_sourceDataType) { case EXTERNALVIEW: refresh(_dataCache.getExternalViews().values(), _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); break; - case TARGETEXTERNALVIEW: refresh(_dataCache.getTargetExternalViews().values(), _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); break; - case CURRENTSTATES: refresh(_dataCache.getCurrentStatesMap(), _dataCache.getInstanceConfigMap().values(), _dataCache.getLiveInstances().values()); break; - default: - logger.warn("Unsupported source data type: " + _sourceDataType - + ", stop refreshing the routing table!"); + logger.warn("Unsupported source data type: {}, stop refreshing the routing table!", + _sourceDataType); } } } @@ -550,12 +601,14 @@ public class RoutingTableProvider implements ExternalViewChangeListener, Instanc public void queueEvent(NotificationContext context, ClusterEventType eventType, HelixConstants.ChangeType changeType) { ClusterEvent event = new ClusterEvent(_clusterName, eventType); - if (context == null || context.getType() != NotificationContext.Type.CALLBACK) { + if (context == null || context.getType() != NotificationContext.Type.CALLBACK + || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) { _dataCache.requireFullRefresh(); } else { _dataCache.notifyDataChange(changeType, context.getPathChanged()); } + // Null check for manager in the following line is done in handleEvent() event.addAttribute(AttributeName.helixmanager.name(), context.getManager()); event.addAttribute(AttributeName.changeContext.name(), context); queueEvent(event); http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java new file mode 100644 index 0000000..dac7617 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java @@ -0,0 +1,218 @@ +package org.apache.helix.integration.spectator; + +import java.util.ArrayList; +import java.util.Collection; +import java.util.Date; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixManager; +import org.apache.helix.HelixManagerFactory; +import org.apache.helix.InstanceType; +import org.apache.helix.NotificationContext; +import org.apache.helix.PropertyType; +import org.apache.helix.integration.common.ZkIntegrationTestBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.spectator.RoutingTableProvider; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestRoutingTableProviderPeriodicRefresh extends ZkIntegrationTestBase { + private static final org.slf4j.Logger logger = LoggerFactory.getLogger(TestRoutingTableProviderPeriodicRefresh.class); + + private static final String STATE_MODEL = BuiltInStateModelDefinitions.MasterSlave.name(); + private static final String TEST_DB = "TestDB"; + private static final String CLASS_NAME = TestRoutingTableProvider.class.getSimpleName(); + private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + private static final int PARTICIPANT_NUMBER = 3; + private static final int PARTICIPANT_START_PORT = 12918; + + private static final int PARTITION_NUMBER = 20; + private static final int REPLICA_NUMBER = 3; + + private HelixManager _spectator; + private HelixManager _spectator_2; + private HelixManager _spectator_3; + private List<MockParticipantManager> _participants = new ArrayList<>(); + private List<String> _instances = new ArrayList<>(); + private ClusterControllerManager _controller; + private HelixClusterVerifier _clusterVerifier; + private MockRoutingTableProvider _routingTableProvider; + private MockRoutingTableProvider _routingTableProviderNoPeriodicRefresh; + private MockRoutingTableProvider _routingTableProviderLongPeriodicRefresh; + + @BeforeClass + public void beforeClass() throws Exception { + System.out + .println("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + } + + // start dummy participants + for (int i = 0; i < PARTICIPANT_NUMBER; i++) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.syncStart(); + _participants.add(participant); + } + + createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances, STATE_MODEL, + PARTITION_NUMBER, REPLICA_NUMBER); + + // start controller + String controllerName = CONTROLLER_PREFIX + "_0"; + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, controllerName); + _controller.syncStart(); + + // start speculator - initialize it with a Mock + _spectator = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator", + InstanceType.SPECTATOR, ZK_ADDR); + _spectator.connect(); + + _spectator_2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator_2", + InstanceType.SPECTATOR, ZK_ADDR); + _spectator_2.connect(); + + _spectator_3 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, "spectator_3", + InstanceType.SPECTATOR, ZK_ADDR); + _spectator_3.connect(); + + _routingTableProvider = + new MockRoutingTableProvider(_spectator, PropertyType.EXTERNALVIEW, true, 1000L); + _spectator.addExternalViewChangeListener(_routingTableProvider); + _spectator.addLiveInstanceChangeListener(_routingTableProvider); + _spectator.addInstanceConfigChangeListener(_routingTableProvider); + + _routingTableProviderNoPeriodicRefresh = + new MockRoutingTableProvider(_spectator_2, PropertyType.EXTERNALVIEW, false, 1000L); + _spectator_2.addExternalViewChangeListener(_routingTableProviderNoPeriodicRefresh); + _spectator_2.addLiveInstanceChangeListener(_routingTableProviderNoPeriodicRefresh); + _spectator_2.addInstanceConfigChangeListener(_routingTableProviderNoPeriodicRefresh); + + _routingTableProviderLongPeriodicRefresh = + new MockRoutingTableProvider(_spectator_3, PropertyType.EXTERNALVIEW, true, 3000000L); + _spectator_3.addExternalViewChangeListener(_routingTableProviderLongPeriodicRefresh); + _spectator_3.addLiveInstanceChangeListener(_routingTableProviderLongPeriodicRefresh); + _spectator_3.addInstanceConfigChangeListener(_routingTableProviderLongPeriodicRefresh); + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(_clusterVerifier.verify()); + + } + + @AfterClass + public void afterClass() { + // stop participants + for (MockParticipantManager p : _participants) { + p.syncStop(); + } + + _controller.syncStop(); + _spectator.disconnect(); + _spectator_2.disconnect(); + _spectator_3.disconnect(); + _gSetupTool.deleteCluster(CLUSTER_NAME); + } + + public class MockRoutingTableProvider extends RoutingTableProvider { + private volatile int _refreshCount = 0; + private static final boolean DEBUG = false; + + public MockRoutingTableProvider(HelixManager helixManager, PropertyType sourceDataType, + boolean isPeriodicRefreshEnabled, long periodRefreshInterval) { + super(helixManager, sourceDataType, isPeriodicRefreshEnabled, periodRefreshInterval); + } + + @Override + public synchronized void refresh(List<ExternalView> externalViewList, + NotificationContext changeContext) { + super.refresh(externalViewList, changeContext); + _refreshCount++; + if (DEBUG) { + print(); + } + } + + @Override + public synchronized void refresh(Collection<ExternalView> externalViews, + Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { + super.refresh(externalViews, instanceConfigs, liveInstances); + _refreshCount++; + if (DEBUG) { + print(); + } + } + + @Override + protected synchronized void refresh( + Map<String, Map<String, Map<String, CurrentState>>> currentStateMap, + Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> liveInstances) { + super.refresh(currentStateMap, instanceConfigs, liveInstances); + _refreshCount++; + if (DEBUG) { + print(); + } + } + + // Log statements for debugging purposes + private void print() { + logger.error("Refresh happened; count: {}", getRefreshCount()); + logger.error("timestamp: {}", System.currentTimeMillis()); + } + + synchronized int getRefreshCount() { + return _refreshCount; + } + } + + @Test + public void testPeriodicRefresh() throws InterruptedException { + // Wait so that initial refreshes finish (not triggered by periodic refresh timer) + Thread.sleep(1000L); + + // Test short refresh + int prevRefreshCount = _routingTableProvider.getRefreshCount(); + // Wait for one timer duration + Thread.sleep(1000L); + // The timer should have gone off, incrementing the refresh count + Assert.assertEquals(_routingTableProvider.getRefreshCount(), prevRefreshCount + 1); + + // Test no periodic refresh + prevRefreshCount = _routingTableProviderNoPeriodicRefresh.getRefreshCount(); + // Wait + Thread.sleep(2000); + // The timer should NOT have gone off, the refresh count must stay the same + Assert.assertEquals(_routingTableProviderNoPeriodicRefresh.getRefreshCount(), prevRefreshCount); + + // Test long periodic refresh + prevRefreshCount = _routingTableProviderLongPeriodicRefresh.getRefreshCount(); + // Wait + Thread.sleep(2000); + // The timer should NOT have gone off yet, the refresh count must stay the same + Assert.assertEquals(_routingTableProviderLongPeriodicRefresh.getRefreshCount(), + prevRefreshCount); + + // Call shutdown to make sure they are shutting down properly + _routingTableProvider.shutdown(); + _routingTableProviderNoPeriodicRefresh.shutdown(); + _routingTableProviderLongPeriodicRefresh.shutdown(); + } +}
