mcvsubbu commented on a change in pull request #3885: Actively check cluster
changes if there is no callback for a long time
URL: https://github.com/apache/incubator-pinot/pull/3885#discussion_r260561435
##########
File path:
pinot-broker/src/main/java/org/apache/pinot/broker/broker/helix/ClusterChangeMediator.java
##########
@@ -18,142 +18,136 @@
*/
package org.apache.pinot.broker.broker.helix;
-import java.util.ArrayList;
+import com.google.common.base.Preconditions;
import java.util.List;
-import java.util.concurrent.LinkedBlockingQueue;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
-import org.apache.commons.lang3.tuple.ImmutablePair;
-import org.apache.commons.lang3.tuple.Pair;
+import org.apache.helix.HelixConstants.ChangeType;
import org.apache.helix.NotificationContext;
+import org.apache.helix.api.listeners.BatchMode;
import org.apache.helix.api.listeners.ExternalViewChangeListener;
import org.apache.helix.api.listeners.InstanceConfigChangeListener;
import org.apache.helix.api.listeners.LiveInstanceChangeListener;
+import org.apache.helix.api.listeners.PreFetch;
import org.apache.helix.model.ExternalView;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.model.LiveInstance;
-import org.apache.pinot.broker.queryquota.TableQueryQuotaManager;
-import org.apache.pinot.broker.routing.HelixExternalViewBasedRouting;
+import org.apache.pinot.common.metrics.BrokerMeter;
import org.apache.pinot.common.metrics.BrokerMetrics;
import org.apache.pinot.common.metrics.BrokerTimer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
- * Manages the interactions between Helix cluster changes, the routing table
and the connection pool.
+ * The {@code ClusterChangeMediator} handles the changes from Helix cluster.
+ * <p>
+ * <p>If there is no change callback in 1 hour, proactively check changes so
that the changes are getting processed even
+ * when callbacks stop working.
+ * <p>NOTE: disable Helix batch-mode and perform deduplication in this class.
+ * <p>NOTE: disable Helix pre-fetch to reduce the ZK accesses.
*/
-public class ClusterChangeMediator implements LiveInstanceChangeListener,
ExternalViewChangeListener, InstanceConfigChangeListener {
+@BatchMode(enabled = false)
+@PreFetch(enabled = false)
+public class ClusterChangeMediator implements ExternalViewChangeListener,
InstanceConfigChangeListener, LiveInstanceChangeListener {
private static final Logger LOGGER =
LoggerFactory.getLogger(ClusterChangeMediator.class);
- private final HelixExternalViewBasedRouting _helixExternalViewBasedRouting;
- private final TableQueryQuotaManager _tableQueryQuotaManager;
- private enum UpdateType {
- EXTERNAL_VIEW, INSTANCE_CONFIG
- }
+ private static final long PROACTIVE_CHANGE_CHECK_INTERVAL_MS = 3600 * 1000L;
+
+ private final Map<ChangeType, ClusterChangeHandler> _changeHandlerMap;
+ private final Map<ChangeType, Long> _lastChangeTimeMap = new
ConcurrentHashMap<>();
+ private final Map<ChangeType, Long> _lastProcessTimeMap = new
ConcurrentHashMap<>();
- private final LinkedBlockingQueue<Pair<UpdateType, Long>>
_clusterChangeQueue = new LinkedBlockingQueue<>(1000);
+ private final Thread _clusterChangeHandlingThread;
- private Thread _deferredClusterUpdater = null;
+ public ClusterChangeMediator(Map<ChangeType, ClusterChangeHandler>
changeHandlerMap, BrokerMetrics brokerMetrics) {
+ _changeHandlerMap = changeHandlerMap;
- public ClusterChangeMediator(HelixExternalViewBasedRouting
helixExternalViewBasedRouting,
- TableQueryQuotaManager tableQueryQuotaManager, final BrokerMetrics
brokerMetrics) {
- _helixExternalViewBasedRouting = helixExternalViewBasedRouting;
- _tableQueryQuotaManager = tableQueryQuotaManager;
+ // Initialize last process time map
+ long initTime = System.currentTimeMillis();
+ for (ChangeType changeType : changeHandlerMap.keySet()) {
+ _lastProcessTimeMap.put(changeType, initTime);
+ }
- // Simple thread that polls every 10 seconds to check if there are any
cluster updates to apply
- _deferredClusterUpdater = new Thread("Deferred cluster state updater") {
+ _clusterChangeHandlingThread = new Thread("ClusterChangeHandlingThread") {
@Override
public void run() {
while (true) {
try {
- // Wait for at least one update
- Pair<UpdateType, Long> firstUpdate = _clusterChangeQueue.take();
-
- // Update the queue time metrics
- long queueTime = System.currentTimeMillis() -
firstUpdate.getValue();
-
brokerMetrics.addTimedValue(BrokerTimer.ROUTING_TABLE_UPDATE_QUEUE_TIME,
queueTime, TimeUnit.MILLISECONDS);
-
- // Take all other updates also present
- List<Pair<UpdateType, Long>> allUpdates = new ArrayList<>();
- allUpdates.add(firstUpdate);
- _clusterChangeQueue.drainTo(allUpdates);
-
- // Gather all update types
- boolean externalViewUpdated = false;
- boolean instanceConfigUpdated = false;
-
- for (Pair<UpdateType, Long> update : allUpdates) {
- if (update.getKey() == UpdateType.EXTERNAL_VIEW) {
- externalViewUpdated = true;
- } else if (update.getKey() == UpdateType.INSTANCE_CONFIG) {
- instanceConfigUpdated = true;
- }
- }
-
- if (externalViewUpdated) {
- try {
- _helixExternalViewBasedRouting.processExternalViewChange();
- _tableQueryQuotaManager.processQueryQuotaChange();
- } catch (Exception e) {
- LOGGER.warn("Caught exception while updating external view",
e);
+ for (Map.Entry<ChangeType, ClusterChangeHandler> entry :
_changeHandlerMap.entrySet()) {
+ ChangeType changeType = entry.getKey();
+ ClusterChangeHandler changeHandler = entry.getValue();
+ long currentTime = System.currentTimeMillis();
+ Long lastChangeTime = _lastChangeTimeMap.remove(changeType);
+ if (lastChangeTime != null) {
+
brokerMetrics.addTimedValue(BrokerTimer.CLUSTER_CHANGE_QUEUE_TIME, currentTime
- lastChangeTime,
+ TimeUnit.MILLISECONDS);
+ processClusterChange(changeType, changeHandler);
+ } else {
+ long lastProcessTime = _lastProcessTimeMap.get(changeType);
+ if (currentTime - lastProcessTime >
PROACTIVE_CHANGE_CHECK_INTERVAL_MS) {
Review comment:
Am I right in reading this as the first update will not be processed until 1
hour after start? Maybe we should set the map to time 0 in line 70?
----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on GitHub and use the
URL above to go to the specific comment.
For queries about this service, please contact Infrastructure at:
[email protected]
With regards,
Apache Git Services
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]