mcvsubbu commented on a change in pull request #3885: Actively check cluster 
changes if there is no callback for a long time
URL: https://github.com/apache/incubator-pinot/pull/3885#discussion_r260563318
 
 

 ##########
 File path: 
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 ##########
 @@ -18,142 +18,136 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
-import java.util.ArrayList;
+import com.google.common.base.Preconditions;
 import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
 import org.apache.helix.api.listeners.ExternalViewChangeListener;
 import org.apache.helix.api.listeners.InstanceConfigChangeListener;
 import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
 import org.apache.helix.model.ExternalView;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
 import org.apache.pinot.common.metrics.BrokerMetrics;
 import org.apache.pinot.common.metrics.BrokerTimer;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
 
 /**
- * Manages the interactions between Helix cluster changes, the routing table 
and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so 
that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
-public class ClusterChangeMediator implements LiveInstanceChangeListener, 
ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener, 
InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterChangeMediator.class);
-  private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
-  private final TableQueryQuotaManager _tableQueryQuotaManager;
 
-  private enum UpdateType {
-    EXTERNAL_VIEW, INSTANCE_CONFIG
-  }
+  private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+  private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new 
ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new 
ConcurrentHashMap<>();
 
-  private final LinkedBlockingQueue<Pair<UpdateType, Long>> 
_clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+  private final Thread _clusterChangeHandlingThread;
 
-  private Thread _deferredClusterUpdater = null;
+  public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler> 
changeHandlerMap, BrokerMetrics brokerMetrics) {
+    _changeHandlerMap = changeHandlerMap;
 
-  public ClusterChangeMediator(HelixExternalViewBasedRouting 
helixExternalViewBasedRouting,
-      TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics 
brokerMetrics) {
-    _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
-    _tableQueryQuotaManager = tableQueryQuotaManager;
+    // Initialize last process time map
+    long initTime = System.currentTimeMillis();
+    for (ChangeType changeType : changeHandlerMap.keySet()) {
+      _lastProcessTimeMap.put(changeType, initTime);
+    }
 
-    // Simple thread that polls every 10 seconds to check if there are any 
cluster updates to apply
-    _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+    _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
       @Override
       public void run() {
         while (true) {
           try {
-            // Wait for at least one update
-            Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
-            // Update the queue time metrics
-            long queueTime = System.currentTimeMillis() - 
firstUpdate.getValue();
-            
brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME, 
queueTime, TimeUnit.MILLISECONDS);
-
-            // Take all other updates also present
-            List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
-            allUpdates.add(firstUpdate);
-            _clusterChangeQueue.drainTo(allUpdates);
-
-            // Gather all update types
-            boolean externalViewUpdated = false;
-            boolean instanceConfigUpdated = false;
-
-            for (Pair<UpdateType, Long> update : allUpdates) {
-              if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
-                externalViewUpdated = true;
-              } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
-                instanceConfigUpdated = true;
-              }
-            }
-
-            if (externalViewUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processExternalViewChange();
-                _tableQueryQuotaManager.processQueryQuotaChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while updating external view", 
e);
+            for (Map.Entry<ChangeType, ClusterChangeHandler> entry : 
_changeHandlerMap.entrySet()) {
+              ChangeType changeType = entry.getKey();
+              ClusterChangeHandler changeHandler = entry.getValue();
+              long currentTime = System.currentTimeMillis();
+              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              if (lastChangeTime != null) {
+                
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime 
- lastChangeTime,
+                    TimeUnit.MILLISECONDS);
+                processClusterChange(changeType, changeHandler);
+              } else {
+                long lastProcessTime = _lastProcessTimeMap.get(changeType);
+                if (currentTime - lastProcessTime > 
PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
+                  LOGGER.info("Proactive check {} change", changeType);
+                  
brokerMetrics.addMeteredGlobalValue(BrokerMeter.PROACTIVE_CLUSTER_CHANGE_CHECK, 
1L);
+                  processClusterChange(changeType, changeHandler);
+                }
               }
             }
 
-            if (instanceConfigUpdated) {
-              try {
-                _helixExternalViewBasedRouting.processInstanceConfigChange();
-              } catch (Exception e) {
-                LOGGER.warn("Caught exception while processing instance 
config", e);
-              }
-            }
+            // Sleep 1 second after each round of checks
+            Thread.sleep(1000L);
           } catch (InterruptedException e) {
-            LOGGER.warn("Was interrupted while waiting for a cluster change", 
e);
+            LOGGER.warn("Cluster change handling thread is interrupted, 
stopping the thread");
             break;
+          } catch (Exception e) {
+            LOGGER.error("Caught exception while handling changes", e);
           }
         }
-
-        LOGGER.warn("Stopping deferred cluster state update thread");
-        _deferredClusterUpdater = null;
       }
     };
+    _clusterChangeHandlingThread.start();
+  }
 
-    _deferredClusterUpdater.start();
+  private void processClusterChange(ChangeType changeType, 
ClusterChangeHandler changeHandler) {
+    long startTime = System.currentTimeMillis();
+    LOGGER.info("Start processing {} change", changeType);
+    changeHandler.processClusterChange();
+    long endTime = System.currentTimeMillis();
+    LOGGER.info("Finish processing {} change in {}ms", changeType, endTime - 
startTime);
+    _lastProcessTimeMap.put(changeType, endTime);
   }
 
   @Override
   public void onExternalViewChange(List<ExternalView> externalViewList, 
NotificationContext changeContext) {
-    // If the deferred update thread is alive, defer the update
-    if (_deferredClusterUpdater != null && _deferredClusterUpdater.isAlive()) {
-      try {
-        _clusterChangeQueue.put(new ImmutablePair<>(UpdateType.EXTERNAL_VIEW, 
System.currentTimeMillis()));
-      } catch (InterruptedException e) {
-        LOGGER.warn("Was interrupted while trying to add external view change 
to queue", e);
-      }
-    } else {
-      LOGGER.warn(
-          "Deferred cluster updater thread is null or stopped, not deferring 
external view routing table rebuild");
-      _helixExternalViewBasedRouting.processExternalViewChange();
-      _tableQueryQuotaManager.processQueryQuotaChange();
-    }
+    Preconditions.checkState(externalViewList.isEmpty(), "Helix pre-fetch 
should be disabled");
 
 Review comment:
   If I understand right, the pre-fetch is an annotation on this method. Why 
not add the annotation and set it to be false? Another way of putting this 
question: What is an admin supposed to do if they see this pre-condition fail?

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
[email protected]


With regards,
Apache Git Services

---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to