Repository: helix
Updated Branches:
  refs/heads/master e1faf2404 -> 0e4163f18


[HELIX-698] Add periodic refresh to RoutingTableProvider

There have been incidents where RoutingTableProvider was not getting a proper 
refresh potentially due to the lag in ZKClient CallbackHandler or connectivity 
issues. This addition of periodic refresh avoids cases where 
RoutingTableProvider is severely delayed by initiating periodic refreshes.


Project: http://git-wip-us.apache.org/repos/asf/helix/repo
Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/0e4163f1
Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/0e4163f1
Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/0e4163f1

Branch: refs/heads/master
Commit: 0e4163f18c1274c0f77320698e9dfbf42314810d
Parents: e1faf24
Author: Hunter Lee <[email protected]>
Authored: Thu Apr 19 13:42:37 2018 -0700
Committer: Hunter Lee <[email protected]>
Committed: Thu Apr 19 14:07:52 2018 -0700

----------------------------------------------------------------------
 .../org/apache/helix/NotificationContext.java   |   1 +
 .../common/caches/BasicClusterDataCache.java    |   2 +-
 .../helix/spectator/RoutingTableProvider.java   | 225 ++++++++++++-------
 ...TestRoutingTableProviderPeriodicRefresh.java | 218 ++++++++++++++++++
 4 files changed, 359 insertions(+), 87 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/NotificationContext.java
----------------------------------------------------------------------
diff --git a/helix-core/src/main/java/org/apache/helix/NotificationContext.java 
b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
index dd76b60..9664f66 100644
--- a/helix-core/src/main/java/org/apache/helix/NotificationContext.java
+++ b/helix-core/src/main/java/org/apache/helix/NotificationContext.java
@@ -188,6 +188,7 @@ public class NotificationContext {
   public enum Type {
     INIT,
     CALLBACK,
+    PERIODIC_REFRESH,
     FINALIZE
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
 
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
index d6e324d..06fcaf6 100644
--- 
a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
+++ 
b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java
@@ -225,7 +225,7 @@ public class BasicClusterDataCache {
    */
   public void requireFullRefresh() {
     for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) {
-      _propertyDataChangedMap.put(type, Boolean.valueOf(true));
+      _propertyDataChangedMap.put(type, Boolean.TRUE);
     }
   }
 

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
index 4076697..f72d66a 100644
--- 
a/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
+++ 
b/helix-core/src/main/java/org/apache/helix/spectator/RoutingTableProvider.java
@@ -26,6 +26,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ScheduledThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 
 import org.apache.helix.HelixConstants;
@@ -53,25 +55,47 @@ import org.apache.helix.model.LiveInstance;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-public class RoutingTableProvider implements ExternalViewChangeListener, 
InstanceConfigChangeListener,
-    ConfigChangeListener, LiveInstanceChangeListener, 
CurrentStateChangeListener {
+public class RoutingTableProvider
+    implements ExternalViewChangeListener, InstanceConfigChangeListener, 
ConfigChangeListener,
+               LiveInstanceChangeListener, CurrentStateChangeListener {
   private static final Logger logger = 
LoggerFactory.getLogger(RoutingTableProvider.class);
+  private static final long DEFAULT_PERIODIC_REFRESH_INTERVAL = 300000; // 5 
minutes
   private final AtomicReference<RoutingTable> _routingTableRef;
   private final HelixManager _helixManager;
   private final RouterUpdater _routerUpdater;
   private final PropertyType _sourceDataType;
   private final Map<RoutingTableChangeListener, ListenerContext> 
_routingTableChangeListenerMap;
 
+  // For periodic refresh
+  private long _lastRefreshTimestamp;
+  private boolean _isPeriodicRefreshEnabled = true; // Default is enabled
+  private long _periodRefreshInterval;
+  private ScheduledThreadPoolExecutor _periodicRefreshExecutor;
+
   public RoutingTableProvider() {
     this(null);
   }
 
   public RoutingTableProvider(HelixManager helixManager) throws HelixException 
{
-    this(helixManager, PropertyType.EXTERNALVIEW);
+    this(helixManager, PropertyType.EXTERNALVIEW, true, 
DEFAULT_PERIODIC_REFRESH_INTERVAL);
   }
 
   public RoutingTableProvider(HelixManager helixManager, PropertyType 
sourceDataType)
       throws HelixException {
+    this(helixManager, sourceDataType, true, 
DEFAULT_PERIODIC_REFRESH_INTERVAL);
+  }
+
+  /**
+   * Initialize an instance of RoutingTableProvider
+   *
+   * @param helixManager
+   * @param sourceDataType
+   * @param isPeriodicRefreshEnabled true if periodic refresh is enabled, 
false otherwise
+   * @param periodRefreshInterval only effective if isPeriodRefreshEnabled is 
true
+   * @throws HelixException
+   */
+  public RoutingTableProvider(HelixManager helixManager, PropertyType 
sourceDataType,
+      boolean isPeriodicRefreshEnabled, long periodRefreshInterval) throws 
HelixException {
     _routingTableRef = new AtomicReference<>(new RoutingTable());
     _helixManager = helixManager;
     _sourceDataType = sourceDataType;
@@ -79,41 +103,43 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
     String clusterName = _helixManager != null ? 
_helixManager.getClusterName() : null;
     _routerUpdater = new RouterUpdater(clusterName, _sourceDataType);
     _routerUpdater.start();
+
     if (_helixManager != null) {
       switch (_sourceDataType) {
-      case EXTERNALVIEW:
-        try {
-          _helixManager.addExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach ExternalView Listener to 
HelixManager!");
-          throw new HelixException("Failed to attach ExternalView Listener to 
HelixManager!", e);
-        }
-        break;
-
-      case TARGETEXTERNALVIEW:
-        // Check whether target external has been enabled or not
-        if (!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
-            
_helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
 0)) {
-          shutdown();
-          throw new HelixException("Target External View is not enabled!");
-        }
+        case EXTERNALVIEW:
+          try {
+            _helixManager.addExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            logger.error("Failed to attach ExternalView Listener to 
HelixManager!");
+            throw new HelixException("Failed to attach ExternalView Listener 
to HelixManager!", e);
+          }
+          break;
 
-        try {
-          _helixManager.addTargetExternalViewChangeListener(this);
-        } catch (Exception e) {
-          shutdown();
-          logger.error("Failed to attach TargetExternalView Listener to 
HelixManager!");
-          throw new HelixException("Failed to attach TargetExternalView 
Listener to HelixManager!", e);
-        }
-        break;
+        case TARGETEXTERNALVIEW:
+          // Check whether target external has been enabled or not
+          if 
(!_helixManager.getHelixDataAccessor().getBaseDataAccessor().exists(
+              
_helixManager.getHelixDataAccessor().keyBuilder().targetExternalViews().getPath(),
 0)) {
+            shutdown();
+            throw new HelixException("Target External View is not enabled!");
+          }
 
-      case CURRENTSTATES:
-        // CurrentState change listeners will be added later in 
LiveInstanceChange call.
-        break;
+          try {
+            _helixManager.addTargetExternalViewChangeListener(this);
+          } catch (Exception e) {
+            shutdown();
+            logger.error("Failed to attach TargetExternalView Listener to 
HelixManager!");
+            throw new HelixException("Failed to attach TargetExternalView 
Listener to HelixManager!",
+                e);
+          }
+          break;
 
-      default:
-        throw new HelixException("Unsupported source data type: " + 
sourceDataType);
+        case CURRENTSTATES:
+          // CurrentState change listeners will be added later in 
LiveInstanceChange call.
+          break;
+
+        default:
+          throw new HelixException(String.format("Unsupported source data 
type: %s", sourceDataType));
       }
 
       try {
@@ -128,12 +154,40 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
             e);
       }
     }
+
+    // For periodic refresh
+    if (isPeriodicRefreshEnabled) {
+      _lastRefreshTimestamp = System.currentTimeMillis(); // Initialize 
timestamp with current time
+      _periodRefreshInterval = periodRefreshInterval;
+      // Construct a periodic refresh context
+      final NotificationContext periodicRefreshContext = new 
NotificationContext(_helixManager);
+      
periodicRefreshContext.setType(NotificationContext.Type.PERIODIC_REFRESH);
+      // Create a thread that runs at specified interval
+      _periodicRefreshExecutor = new ScheduledThreadPoolExecutor(1);
+      _periodicRefreshExecutor.scheduleAtFixedRate(new Runnable() {
+        @Override
+        public void run() {
+          // If enough time has elapsed since last refresh, queue a refresh 
event
+          if (_lastRefreshTimestamp + _periodRefreshInterval < 
System.currentTimeMillis()) {
+            // changeType is irrelevant for 
NotificationContext.Type.PERIODIC_REFRESH
+            _routerUpdater.queueEvent(periodicRefreshContext, 
ClusterEventType.PeriodicalRebalance,
+                null);
+          }
+        }
+      }, _periodRefreshInterval, _periodRefreshInterval, 
TimeUnit.MILLISECONDS);
+    } else {
+      _isPeriodicRefreshEnabled = false;
+    }
   }
 
   /**
    * Shutdown current RoutingTableProvider. Once it is shutdown, it should 
never be reused.
    */
   public void shutdown() {
+    if (_periodicRefreshExecutor != null) {
+      _periodicRefreshExecutor.purge();
+      _periodicRefreshExecutor.shutdown();
+    }
     _routerUpdater.shutdown();
     if (_helixManager != null) {
       PropertyKey.Builder keyBuilder = 
_helixManager.getHelixDataAccessor().keyBuilder();
@@ -147,7 +201,7 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
         case CURRENTSTATES:
           NotificationContext context = new NotificationContext(_helixManager);
           context.setType(NotificationContext.Type.FINALIZE);
-          updateCurrentStatesListeners(Collections.<LiveInstance>emptyList(), 
context);
+          updateCurrentStatesListeners(Collections.<LiveInstance> emptyList(), 
context);
           break;
         default:
           break;
@@ -158,7 +212,6 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
   /**
    * Get an snapshot of current RoutingTable information. The snapshot is 
immutable, it reflects the
    * routing table information at the time this method is called.
-   *
    * @return snapshot of current routing table.
    */
   public RoutingTableSnapshot getRoutingTableSnapshot() {
@@ -167,29 +220,30 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
 
   /**
    * Add RoutingTableChangeListener with user defined context
-   *
    * @param routingTableChangeListener
    * @param context user defined context
    */
-  public void addRoutingTableChangeListener(final RoutingTableChangeListener 
routingTableChangeListener,
-      Object context) {
+  public void addRoutingTableChangeListener(
+      final RoutingTableChangeListener routingTableChangeListener, Object 
context) {
     _routingTableChangeListenerMap.put(routingTableChangeListener, new 
ListenerContext(context));
+    logger.info("Attach RoutingTableProviderChangeListener {}",
+        routingTableChangeListener.getClass().getName());
   }
 
   /**
    * Remove RoutingTableChangeListener
-   *
    * @param routingTableChangeListener
    */
   public Object removeRoutingTableChangeListener(
       final RoutingTableChangeListener routingTableChangeListener) {
+    logger.info("Detach RoutingTableProviderChangeListener {}",
+        routingTableChangeListener.getClass().getName());
     return _routingTableChangeListenerMap.remove(routingTableChangeListener);
   }
 
   /**
    * returns the instances for {resource,partition} pair that are in a specific
    * {state}
-   *
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String, String)} 
getInstancesForResource} method.
    * @param resourceName
@@ -198,7 +252,8 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
    * @param state
    * @return empty list if there is no instance in a given state
    */
-  public List<InstanceConfig> getInstances(String resourceName, String 
partitionName, String state) {
+  public List<InstanceConfig> getInstances(String resourceName, String 
partitionName,
+      String state) {
     return getInstancesForResource(resourceName, partitionName, state);
   }
 
@@ -211,21 +266,19 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
    * @param state
    * @return empty list if there is no instance in a given state
    */
-  public List<InstanceConfig> getInstancesForResource(String resourceName, 
String partitionName, String state) {
+  public List<InstanceConfig> getInstancesForResource(String resourceName, 
String partitionName,
+      String state) {
     return _routingTableRef.get().getInstancesForResource(resourceName, 
partitionName, state);
   }
 
   /**
    * returns the instances for {resource group,partition} pair in all 
resources belongs to the given
    * resource group that are in a specific {state}.
-   *
    * The return results aggregate all partition states from all the resources 
in the given resource
    * group.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String 
resourceGroupName,
@@ -237,26 +290,22 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
   /**
    * returns the instances for {resource group,partition} pair contains any of 
the given tags
    * that are in a specific {state}.
-   *
    * Find all resources belongs to the given resource group that have any of 
the given resource tags
    * and return the aggregated partition states from all these resources.
-   *
    * @param resourceGroupName
    * @param partitionName
    * @param state
    * @param resourceTags
-   *
    * @return empty list if there is no instance in a given state
    */
   public List<InstanceConfig> getInstancesForResourceGroup(String 
resourceGroupName,
       String partitionName, String state, List<String> resourceTags) {
-    return _routingTableRef.get()
-        .getInstancesForResourceGroup(resourceGroupName, partitionName, state, 
resourceTags);
+    return 
_routingTableRef.get().getInstancesForResourceGroup(resourceGroupName, 
partitionName,
+        state, resourceTags);
   }
 
   /**
    * returns all instances for {resource} that are in a specific {state}
-   *
    * This method will be deprecated, please use the
    * {@link #getInstancesForResource(String, String) getInstancesForResource} 
method.
    * @param resourceName
@@ -279,10 +328,8 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
 
   /**
    * returns all instances for all resources in {resource group} that are in a 
specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String 
resourceGroupName, String state) {
@@ -292,10 +339,8 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
   /**
    * returns all instances for resources contains any given tags in {resource 
group} that are in a
    * specific {state}
-   *
    * @param resourceGroupName
    * @param state
-   *
    * @return empty list if there is no instance in a given state
    */
   public Set<InstanceConfig> getInstancesForResourceGroup(String 
resourceGroupName, String state,
@@ -333,13 +378,15 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
       NotificationContext changeContext) {
     HelixConstants.ChangeType changeType = changeContext.getChangeType();
     if (changeType != null && 
!changeType.getPropertyType().equals(_sourceDataType)) {
-      logger.warn("onExternalViewChange called with dis-matched change types. 
Source data type "
-          + _sourceDataType + ", changed data type: " + changeType);
+      logger.warn(
+          "onExternalViewChange called with mismatched change types. Source 
data type {}, changed data type: {}",
+          _sourceDataType, changeType);
       return;
     }
     // Refresh with full list of external view.
     if (externalViewList != null && externalViewList.size() > 0) {
-      // keep this here for back-compatibility, application can call 
onExternalViewChange directly with externalview list supplied.
+      // keep this here for back-compatibility, application can call 
onExternalViewChange directly
+      // with externalview list supplied.
       refresh(externalViewList, changeContext);
     } else {
       ClusterEventType eventType;
@@ -348,8 +395,9 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
       } else if (_sourceDataType.equals(PropertyType.TARGETEXTERNALVIEW)) {
         eventType = ClusterEventType.TargetExternalViewChange;
       } else {
-        logger.warn("onExternalViewChange called with dis-matched change 
types. Source data type "
-            + _sourceDataType + ", change type: " + changeType);
+        logger.warn(
+            "onExternalViewChange called with mismatched change types. Source 
data type {}, change type: {}",
+            _sourceDataType, changeType);
         return;
       }
       _routerUpdater.queueEvent(changeContext, eventType, changeType);
@@ -366,8 +414,7 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
 
   @Override
   @PreFetch(enabled = false)
-  public void onConfigChange(List<InstanceConfig> configs,
-      NotificationContext changeContext) {
+  public void onConfigChange(List<InstanceConfig> configs, NotificationContext 
changeContext) {
     onInstanceConfigChange(configs, changeContext);
   }
 
@@ -386,8 +433,8 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
 
   @Override
   @PreFetch(enabled = false)
-  public void onStateChange(String instanceName,
-      List<CurrentState> statesInfo, NotificationContext changeContext) {
+  public void onStateChange(String instanceName, List<CurrentState> statesInfo,
+      NotificationContext changeContext) {
     if (_sourceDataType.equals(PropertyType.CURRENTSTATES)) {
       _routerUpdater.queueEvent(changeContext, 
ClusterEventType.CurrentStateChange,
           HelixConstants.ChangeType.CURRENT_STATE);
@@ -410,7 +457,7 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
 
     if (changeContext.getType() == NotificationContext.Type.FINALIZE) {
       // on finalize, should remove all current-state listeners
-      logger.info("remove current-state listeners. lastSeenSessions: " + 
_lastSeenSessions);
+      logger.info("remove current-state listeners. lastSeenSessions: {}", 
_lastSeenSessions);
       liveInstances = Collections.emptyList();
     }
 
@@ -433,11 +480,12 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
           try {
             // add current-state listeners for new sessions
             manager.addCurrentStateChangeListener(this, instanceName, session);
-            logger.info(manager.getInstanceName() + " added current-state 
listener for instance: "
-                + instanceName + ", session: " + session + ", listener: " + 
this);
+            logger.info(
+                "{} added current-state listener for instance: {}, session: 
{}, listener: {}",
+                manager.getInstanceName(), instanceName, session, this);
           } catch (Exception e) {
-            logger.error("Fail to add current state listener for instance: " + 
instanceName
-                + " with session: " + session, e);
+            logger.error("Fail to add current state listener for instance: {} 
with session: {}",
+                instanceName, session, e);
           }
         }
       }
@@ -447,8 +495,8 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
         if (!curSessions.containsKey(session)) {
           String instanceName = lastSessions.get(session).getInstanceName();
           manager.removeListener(keyBuilder.currentStates(instanceName, 
session), this);
-          logger.info("remove current-state listener for instance:" + 
instanceName + ", session: "
-              + session);
+          logger.info("remove current-state listener for instance: {}, 
session: {}", instanceName,
+              session);
         }
       }
 
@@ -476,10 +524,7 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
       Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable = new RoutingTable(externalViews, 
instanceConfigs, liveInstances);
-    _routingTableRef.set(newRoutingTable);
-    logger.info("Refreshed the RoutingTable for cluster " + (_helixManager != 
null ? _helixManager
-        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - 
startTime) + "ms.");
-    notifyRoutingTableChange();
+    resetRoutingTableAndNotify(startTime, newRoutingTable);
   }
 
   protected void refresh(Map<String, Map<String, Map<String, CurrentState>>> 
currentStateMap,
@@ -487,10 +532,20 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
     long startTime = System.currentTimeMillis();
     RoutingTable newRoutingTable =
         new RoutingTable(currentStateMap, instanceConfigs, liveInstances);
+    resetRoutingTableAndNotify(startTime, newRoutingTable);
+  }
+
+  private void resetRoutingTableAndNotify(long startTime, RoutingTable 
newRoutingTable) {
     _routingTableRef.set(newRoutingTable);
-    logger.info("Refresh the RoutingTable for cluster " + (_helixManager != 
null ? _helixManager
-        .getClusterName() : null) + ", takes " + (System.currentTimeMillis() - 
startTime) + "ms.");
+    logger.info("Refresh the RoutingTable for cluster {}, takes {} ms.",
+        (_helixManager != null ? _helixManager.getClusterName() : null),
+        (System.currentTimeMillis() - startTime));
     notifyRoutingTableChange();
+
+    // Update timestamp for last refresh
+    if (_isPeriodicRefreshEnabled) {
+      _lastRefreshTimestamp = System.currentTimeMillis();
+    }
   }
 
   private void notifyRoutingTableChange() {
@@ -505,7 +560,7 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
     private final RoutingDataCache _dataCache;
 
     public RouterUpdater(String clusterName, PropertyType sourceDataType) {
-      super("Helix-RouterUpdater");
+      super("Helix-RouterUpdater-event_process");
       _dataCache = new RoutingDataCache(clusterName, sourceDataType);
     }
 
@@ -519,30 +574,26 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
         // refresh routing table.
         HelixManager manager = 
event.getAttribute(AttributeName.helixmanager.name());
         if (manager == null) {
-          logger.error("HelixManager is null for router update event : " + 
event);
+          logger.error(String.format("HelixManager is null for router update 
event: %s", event));
           throw new HelixException("HelixManager is null for router update 
event.");
         }
         _dataCache.refresh(manager.getHelixDataAccessor());
-
         switch (_sourceDataType) {
           case EXTERNALVIEW:
             refresh(_dataCache.getExternalViews().values(),
                 _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
             break;
-
           case TARGETEXTERNALVIEW:
             refresh(_dataCache.getTargetExternalViews().values(),
                 _dataCache.getInstanceConfigMap().values(), 
_dataCache.getLiveInstances().values());
             break;
-
           case CURRENTSTATES:
             refresh(_dataCache.getCurrentStatesMap(), 
_dataCache.getInstanceConfigMap().values(),
                 _dataCache.getLiveInstances().values());
             break;
-
           default:
-            logger.warn("Unsupported source data type: " + _sourceDataType
-                + ", stop refreshing the routing table!");
+            logger.warn("Unsupported source data type: {}, stop refreshing the 
routing table!",
+                _sourceDataType);
         }
       }
     }
@@ -550,12 +601,14 @@ public class RoutingTableProvider implements 
ExternalViewChangeListener, Instanc
     public void queueEvent(NotificationContext context, ClusterEventType 
eventType,
         HelixConstants.ChangeType changeType) {
       ClusterEvent event = new ClusterEvent(_clusterName, eventType);
-      if (context == null || context.getType() != 
NotificationContext.Type.CALLBACK) {
+      if (context == null || context.getType() != 
NotificationContext.Type.CALLBACK
+          || context.getType() == NotificationContext.Type.PERIODIC_REFRESH) {
         _dataCache.requireFullRefresh();
       } else {
         _dataCache.notifyDataChange(changeType, context.getPathChanged());
       }
 
+      // Null check for manager in the following line is done in handleEvent()
       event.addAttribute(AttributeName.helixmanager.name(), 
context.getManager());
       event.addAttribute(AttributeName.changeContext.name(), context);
       queueEvent(event);

http://git-wip-us.apache.org/repos/asf/helix/blob/0e4163f1/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
----------------------------------------------------------------------
diff --git 
a/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
new file mode 100644
index 0000000..dac7617
--- /dev/null
+++ 
b/helix-core/src/test/java/org/apache/helix/integration/spectator/TestRoutingTableProviderPeriodicRefresh.java
@@ -0,0 +1,218 @@
+package org.apache.helix.integration.spectator;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.Date;
+import java.util.List;
+import java.util.Map;
+import org.apache.helix.HelixManager;
+import org.apache.helix.HelixManagerFactory;
+import org.apache.helix.InstanceType;
+import org.apache.helix.NotificationContext;
+import org.apache.helix.PropertyType;
+import org.apache.helix.integration.common.ZkIntegrationTestBase;
+import org.apache.helix.integration.manager.ClusterControllerManager;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.model.BuiltInStateModelDefinitions;
+import org.apache.helix.model.CurrentState;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+import org.apache.helix.spectator.RoutingTableProvider;
+import 
org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier;
+import org.apache.helix.tools.ClusterVerifiers.HelixClusterVerifier;
+import org.slf4j.LoggerFactory;
+import org.testng.Assert;
+import org.testng.annotations.AfterClass;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+public class TestRoutingTableProviderPeriodicRefresh extends 
ZkIntegrationTestBase {
+  private static final org.slf4j.Logger logger = 
LoggerFactory.getLogger(TestRoutingTableProviderPeriodicRefresh.class);
+
+  private static final String STATE_MODEL = 
BuiltInStateModelDefinitions.MasterSlave.name();
+  private static final String TEST_DB = "TestDB";
+  private static final String CLASS_NAME = 
TestRoutingTableProvider.class.getSimpleName();
+  private static final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME;
+  private static final int PARTICIPANT_NUMBER = 3;
+  private static final int PARTICIPANT_START_PORT = 12918;
+
+  private static final int PARTITION_NUMBER = 20;
+  private static final int REPLICA_NUMBER = 3;
+
+  private HelixManager _spectator;
+  private HelixManager _spectator_2;
+  private HelixManager _spectator_3;
+  private List<MockParticipantManager> _participants = new ArrayList<>();
+  private List<String> _instances = new ArrayList<>();
+  private ClusterControllerManager _controller;
+  private HelixClusterVerifier _clusterVerifier;
+  private MockRoutingTableProvider _routingTableProvider;
+  private MockRoutingTableProvider _routingTableProviderNoPeriodicRefresh;
+  private MockRoutingTableProvider _routingTableProviderLongPeriodicRefresh;
+
+  @BeforeClass
+  public void beforeClass() throws Exception {
+    System.out
+        .println("START " + getShortClassName() + " at " + new 
Date(System.currentTimeMillis()));
+
+    // setup storage cluster
+    _gSetupTool.addCluster(CLUSTER_NAME, true);
+
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + 
i);
+      _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance);
+      _instances.add(instance);
+    }
+
+    // start dummy participants
+    for (int i = 0; i < PARTICIPANT_NUMBER; i++) {
+      MockParticipantManager participant =
+          new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i));
+      participant.syncStart();
+      _participants.add(participant);
+    }
+
+    createDBInSemiAuto(_gSetupTool, CLUSTER_NAME, TEST_DB, _instances, 
STATE_MODEL,
+        PARTITION_NUMBER, REPLICA_NUMBER);
+
+    // start controller
+    String controllerName = CONTROLLER_PREFIX + "_0";
+    _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, 
controllerName);
+    _controller.syncStart();
+
+    // start speculator - initialize it with a Mock
+    _spectator = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, 
"spectator",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator.connect();
+
+    _spectator_2 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, 
"spectator_2",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator_2.connect();
+
+    _spectator_3 = HelixManagerFactory.getZKHelixManager(CLUSTER_NAME, 
"spectator_3",
+        InstanceType.SPECTATOR, ZK_ADDR);
+    _spectator_3.connect();
+
+    _routingTableProvider =
+        new MockRoutingTableProvider(_spectator, PropertyType.EXTERNALVIEW, 
true, 1000L);
+    _spectator.addExternalViewChangeListener(_routingTableProvider);
+    _spectator.addLiveInstanceChangeListener(_routingTableProvider);
+    _spectator.addInstanceConfigChangeListener(_routingTableProvider);
+
+    _routingTableProviderNoPeriodicRefresh =
+        new MockRoutingTableProvider(_spectator_2, PropertyType.EXTERNALVIEW, 
false, 1000L);
+    
_spectator_2.addExternalViewChangeListener(_routingTableProviderNoPeriodicRefresh);
+    
_spectator_2.addLiveInstanceChangeListener(_routingTableProviderNoPeriodicRefresh);
+    
_spectator_2.addInstanceConfigChangeListener(_routingTableProviderNoPeriodicRefresh);
+
+    _routingTableProviderLongPeriodicRefresh =
+        new MockRoutingTableProvider(_spectator_3, PropertyType.EXTERNALVIEW, 
true, 3000000L);
+    
_spectator_3.addExternalViewChangeListener(_routingTableProviderLongPeriodicRefresh);
+    
_spectator_3.addLiveInstanceChangeListener(_routingTableProviderLongPeriodicRefresh);
+    
_spectator_3.addInstanceConfigChangeListener(_routingTableProviderLongPeriodicRefresh);
+
+    _clusterVerifier =
+        new 
BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build();
+    Assert.assertTrue(_clusterVerifier.verify());
+
+  }
+
+  @AfterClass
+  public void afterClass() {
+    // stop participants
+    for (MockParticipantManager p : _participants) {
+      p.syncStop();
+    }
+
+    _controller.syncStop();
+    _spectator.disconnect();
+    _spectator_2.disconnect();
+    _spectator_3.disconnect();
+    _gSetupTool.deleteCluster(CLUSTER_NAME);
+  }
+
+  public class MockRoutingTableProvider extends RoutingTableProvider {
+    private volatile int _refreshCount = 0;
+    private static final boolean DEBUG = false;
+
+    public MockRoutingTableProvider(HelixManager helixManager, PropertyType 
sourceDataType,
+        boolean isPeriodicRefreshEnabled, long periodRefreshInterval) {
+      super(helixManager, sourceDataType, isPeriodicRefreshEnabled, 
periodRefreshInterval);
+    }
+
+    @Override
+    public synchronized void refresh(List<ExternalView> externalViewList,
+        NotificationContext changeContext) {
+      super.refresh(externalViewList, changeContext);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    @Override
+    public synchronized void refresh(Collection<ExternalView> externalViews,
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
+      super.refresh(externalViews, instanceConfigs, liveInstances);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    @Override
+    protected synchronized void refresh(
+        Map<String, Map<String, Map<String, CurrentState>>> currentStateMap,
+        Collection<InstanceConfig> instanceConfigs, Collection<LiveInstance> 
liveInstances) {
+      super.refresh(currentStateMap, instanceConfigs, liveInstances);
+      _refreshCount++;
+      if (DEBUG) {
+        print();
+      }
+    }
+
+    // Log statements for debugging purposes
+    private void print() {
+      logger.error("Refresh happened; count: {}", getRefreshCount());
+      logger.error("timestamp: {}", System.currentTimeMillis());
+    }
+
+    synchronized int getRefreshCount() {
+      return _refreshCount;
+    }
+  }
+
+  @Test
+  public void testPeriodicRefresh() throws InterruptedException {
+    // Wait so that initial refreshes finish (not triggered by periodic 
refresh timer)
+    Thread.sleep(1000L);
+
+    // Test short refresh
+    int prevRefreshCount = _routingTableProvider.getRefreshCount();
+    // Wait for one timer duration
+    Thread.sleep(1000L);
+    // The timer should have gone off, incrementing the refresh count
+    Assert.assertEquals(_routingTableProvider.getRefreshCount(), 
prevRefreshCount + 1);
+
+    // Test no periodic refresh
+    prevRefreshCount = 
_routingTableProviderNoPeriodicRefresh.getRefreshCount();
+    // Wait
+    Thread.sleep(2000);
+    // The timer should NOT have gone off, the refresh count must stay the same
+    
Assert.assertEquals(_routingTableProviderNoPeriodicRefresh.getRefreshCount(), 
prevRefreshCount);
+
+    // Test long periodic refresh
+    prevRefreshCount = 
_routingTableProviderLongPeriodicRefresh.getRefreshCount();
+    // Wait
+    Thread.sleep(2000);
+    // The timer should NOT have gone off yet, the refresh count must stay the 
same
+    
Assert.assertEquals(_routingTableProviderLongPeriodicRefresh.getRefreshCount(),
+        prevRefreshCount);
+
+    // Call shutdown to make sure they are shutting down properly
+    _routingTableProvider.shutdown();
+    _routingTableProviderNoPeriodicRefresh.shutdown();
+    _routingTableProviderLongPeriodicRefresh.shutdown();
+  }
+}

Reply via email to