This is an automated email from the ASF dual-hosted git repository.

jackie pushed a commit to branch cluster_change_mediator
in repository https://gitbox.apache.org/repos/asf/incubator-pinot.git

commit 3d68076bfa32bdc8c68c0f7ce39cb28ae94b798d
Author: Jackie (Xiaotian) Jiang <[email protected]>
AuthorDate: Thu Feb 28 18:50:04 2019 -0800

    In ClusterChangeMediator, remove sleep and make it notify based
    
    When we get the Helix cluster change callbacks, we want to process them ASAP
    Replace sleep() with wait() and notify() so that the new changes are 
processed immediately
    
    Testing done:
    - Check that the queue time matches with the processing time
    - Run tests without starting the cluster change handling thread
    - Reduce proactive change check interval to 1 second and check it interacts 
correctly with Helix callbacks
---
 .../broker/broker/helix/ClusterChangeMediator.java | 37 +++++++++++++++-------
 1 file changed, 25 insertions(+), 12 deletions(-)

diff --git 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
index 1d2dbe6..728a366 100644
--- 
a/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
+++ 
b/pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
@@ -18,9 +18,9 @@
  */
 package org.apache.pinot.broker.broker.helix;
 
+import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import org.apache.helix.HelixConstants.ChangeType;
 import org.apache.helix.NotificationContext;
@@ -44,8 +44,8 @@ import org.slf4j.LoggerFactory;
  * <p>
  * <p>If there is no change callback in 1 hour, proactively check changes so 
that the changes are getting processed even
  * when callbacks stop working.
- * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. 
This can give us more control on the
- * frequency of change checks, and let us track the cluster change queue time.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class. 
This can save us the extra threads for
+ * handling Helix batch-mode, and let us track the cluster change queue time.
  * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
  */
 @BatchMode(enabled = false)
@@ -53,14 +53,12 @@ import org.slf4j.LoggerFactory;
 public class ClusterChangeMediator implements ExternalViewChangeListener, 
InstanceConfigChangeListener, LiveInstanceChangeListener {
   private static final Logger LOGGER = 
LoggerFactory.getLogger(ClusterChangeMediator.class);
 
-  // Add 1 second interval between change checks to deduplicate multiple 
changes of the same type
-  private static final long CHANGE_CHECK_INTERVAL_MS = 1000L;
   // If no change got for 1 hour, proactively check changes
   private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
 
   private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
-  private final Map<ChangeType, Long> _lastChangeTimeMap = new 
ConcurrentHashMap<>();
-  private final Map<ChangeType, Long> _lastProcessTimeMap = new 
ConcurrentHashMap<>();
+  private final Map<ChangeType, Long> _lastChangeTimeMap = new HashMap<>();
+  private final Map<ChangeType, Long> _lastProcessTimeMap = new HashMap<>();
 
   private final Thread _clusterChangeHandlingThread;
 
@@ -84,7 +82,10 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
               ChangeType changeType = entry.getKey();
               ClusterChangeHandler changeHandler = entry.getValue();
               long currentTime = System.currentTimeMillis();
-              Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              Long lastChangeTime;
+              synchronized (_lastChangeTimeMap) {
+                lastChangeTime = _lastChangeTimeMap.remove(changeType);
+              }
               if (lastChangeTime != null) {
                 
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime 
- lastChangeTime,
                     TimeUnit.MILLISECONDS);
@@ -98,10 +99,16 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
                 }
               }
             }
-
-            // Add an interval between change checks to deduplicate multiple 
changes of the same type
-            Thread.sleep(CHANGE_CHECK_INTERVAL_MS);
+            synchronized (_lastChangeTimeMap) {
+              // Wait for at most 1/10 of proactive change check interval if 
no new event received. This can guarantee
+              // that the proactive change check will not be delayed for more 
than 1/10 of the interval. In case of
+              // spurious wakeup, execute the while loop again for the 
proactive change check.
+              if (_lastChangeTimeMap.isEmpty()) {
+                _lastChangeTimeMap.wait(PROACTIVE_CHANGE_CHECK_INTERVAL_MS / 
10);
+              }
+            }
           } catch (Exception e) {
+            // Ignore all exceptions. The thread keeps running until 
ClusterChangeMediator.stop() is invoked.
             LOGGER.error("Caught exception within cluster change handling 
thread", e);
           }
         }
@@ -132,6 +139,9 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
   public void stop() {
     LOGGER.info("Stopping the cluster change handling thread");
     _stopped = true;
+    synchronized (_lastChangeTimeMap) {
+      _lastChangeTimeMap.notify();
+    }
     try {
       _clusterChangeHandlingThread.join();
     } catch (InterruptedException e) {
@@ -173,7 +183,10 @@ public class ClusterChangeMediator implements 
ExternalViewChangeListener, Instan
   private void enqueueChange(ChangeType changeType) {
     if (_clusterChangeHandlingThread.isAlive()) {
       LOGGER.info("Enqueue {} change", changeType);
-      _lastChangeTimeMap.put(changeType, System.currentTimeMillis());
+      synchronized (_lastChangeTimeMap) {
+        _lastChangeTimeMap.putIfAbsent(changeType, System.currentTimeMillis());
+        _lastChangeTimeMap.notify();
+      }
     } else {
       LOGGER.error("Cluster change handling thread is not alive, directly 
process the {} change", changeType);
       processClusterChange(changeType, _changeHandlerMap.get(changeType));


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to