This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit 19b6d7c1db3fdce261e9f035c87e155e06315b80 Author: Lei Xia <[email protected]> AuthorDate: Wed Jan 30 10:48:29 2019 -0800 Fix race condition in P2P message handling that can cause long mastership handoff problem. --- .../helix/common/caches/CurrentStateCache.java | 4 +- .../helix/common/caches/InstanceMessagesCache.java | 348 +++++++++++++-------- .../helix/controller/GenericHelixController.java | 93 +++++- .../ResourceControllerDataProvider.java | 5 +- .../WorkflowControllerDataProvider.java | 4 +- .../helix/controller/stages/ClusterEvent.java | 2 +- .../helix/controller/stages/ClusterEventType.java | 1 + .../controller/stages/MessageGenerationPhase.java | 8 +- .../BestPossibleExternalViewVerifier.java | 1 + .../helix/integration/TestEnableCompression.java | 2 +- .../messaging/TestP2PMessageSemiAuto.java | 2 +- .../messaging/TestP2PSingleTopState.java | 214 +++++++++++++ .../helix/participant/MockZKHelixManager.java | 5 +- 13 files changed, 531 insertions(+), 158 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java index 01c8aa9..38ac745 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -103,10 +103,10 @@ public class CurrentStateCache extends AbstractDataCache<CurrentState> { long endTime = System.currentTimeMillis(); LogUtil.logInfo(LOG, genEventInfo(), "END: CurrentStateCache.refresh() for cluster " + _controlContextProvider.getClusterName() - + ", took " + (endTime - startTime) + " ms"); + + ", started at : " + startTime + ", took " + (endTime - startTime) + " ms"); if (LOG.isDebugEnabled()) { LogUtil.logDebug(LOG, genEventInfo(), - String.format("Current State freshed : %s", _currentStateMap.toString())); + String.format("Current State refreshed : %s", _currentStateMap.toString())); } return true; } diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java index 9fa9136..b57ffb9 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -31,6 +31,7 @@ import java.util.Map; import java.util.Set; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; +import org.apache.helix.controller.GenericHelixController; import org.apache.helix.model.CurrentState; import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; @@ -54,22 +55,21 @@ public class InstanceMessagesCache { // <instance -> {<MessageId, Message>}> private Map<String, Map<String, Message>> _relayMessageCache = Maps.newHashMap(); + // Map of a relay message to its original hosted message. + private Map<String, Message> _relayHostMessageCache = Maps.newHashMap(); - // TODO: Temporary workaround to void participant receiving duplicated state transition messages when p2p is enable. - // should remove this once all clients are migrated to 0.8.2. -- Lei - private Map<String, Message> _committedRelayMessages = Maps.newHashMap(); + public static final String RELAY_MESSAGE_LIFETIME = "helix.controller.messagecache.relaymessagelifetime"; - public static final String COMMIT_MESSAGE_EXPIRY_CONFIG = "helix.controller.messagecache.commitmessageexpiry"; - - private static final int DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY = 20 * 1000; // 20 seconds - private final int _commitMessageExpiry; + // If Helix missed all of other events to evict a relay message from the cache, it will delete the message anyway after this timeout. + private static final int DEFAULT_RELAY_MESSAGE_LIFETIME = 120 * 1000; // in ms + private final int _relayMessageLifetime; private String _clusterName; public InstanceMessagesCache(String clusterName) { _clusterName = clusterName; - _commitMessageExpiry = HelixUtil - .getSystemPropertyAsInt(COMMIT_MESSAGE_EXPIRY_CONFIG, DEFAULT_COMMIT_RELAY_MESSAGE_EXPIRY); + _relayMessageLifetime = HelixUtil + .getSystemPropertyAsInt(RELAY_MESSAGE_LIFETIME, DEFAULT_RELAY_MESSAGE_LIFETIME); } /** @@ -137,195 +137,265 @@ public class InstanceMessagesCache { _messageMap = Collections.unmodifiableMap(msgMap); if (LOG.isDebugEnabled()) { - LOG.debug("Message purge took: " + purgeSum); - LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + ( - System.currentTimeMillis() - startTime) + " ms."); - } + LOG.debug("Message purge took: {} ", purgeSum); - LOG.info("END: InstanceMessagesCache.refresh()"); + } + LOG.info( + "END: InstanceMessagesCache.refresh(), {} of Messages read from ZooKeeper. took {} ms. ", + newMessageKeys.size(), (System.currentTimeMillis() - startTime)); return true; } - // update all valid relay messages attached to existing state transition messages into message map. + /** + * Refresh relay message cache by updating relay messages read from ZK, and remove all expired relay messages. + */ public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap, Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { - // refresh _relayMessageCache + // cache all relay messages read from ZK for (String instance : _messageMap.keySet()) { Map<String, Message> instanceMessages = _messageMap.get(instance); - Map<String, Map<String, CurrentState>> instanceCurrentStateMap = - currentStateMap.get(instance); - if (instanceCurrentStateMap == null) { - continue; - } - for (Message message : instanceMessages.values()) { if (message.hasRelayMessages()) { - String sessionId = message.getTgtSessionId(); - String resourceName = message.getResourceName(); - String partitionName = message.getPartitionName(); - String targetState = message.getToState(); - String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); - - if (!instanceSessionId.equals(sessionId)) { - LOG.info("Instance SessionId does not match, ignore relay messages attached to message " - + message.getId()); - continue; - } - - Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); - if (sessionCurrentStateMap == null) { - LOG.info("No sessionCurrentStateMap found, ignore relay messages attached to message " - + message.getId()); - continue; - } - CurrentState currentState = sessionCurrentStateMap.get(resourceName); - if (currentState == null || !targetState.equals(currentState.getState(partitionName))) { - LOG.info("CurrentState " + currentState - + " do not match the target state of the message, ignore relay messages attached to message " - + message.getId()); - continue; - } - long transitionCompleteTime = currentState.getEndTime(partitionName); - for (Message relayMsg : message.getRelayMessages().values()) { - relayMsg.setRelayTime(transitionCompleteTime); - cacheRelayMessage(relayMsg); + cacheRelayMessage(relayMsg, message); } } } } Map<String, Map<String, Message>> relayMessageMap = new HashMap<>(); - // refresh _relayMessageMap + long nextRebalanceTime = Long.MAX_VALUE; + + // Iterate all relay message in the cache, remove invalid or expired ones. for (String instance : _relayMessageCache.keySet()) { - Map<String, Message> messages = _relayMessageCache.get(instance); - Map<String, Map<String, CurrentState>> instanceCurrentStateMap = - currentStateMap.get(instance); - if (instanceCurrentStateMap == null) { - continue; - } + Map<String, Message> relayMessages = _relayMessageCache.get(instance); + Iterator<Map.Entry<String, Message>> iterator = relayMessages.entrySet().iterator(); - Iterator<Map.Entry<String, Message>> iterator = messages.entrySet().iterator(); while (iterator.hasNext()) { - Message message = iterator.next().getValue(); - String sessionId = message.getTgtSessionId(); - String resourceName = message.getResourceName(); - String partitionName = message.getPartitionName(); - String targetState = message.getToState(); - String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); + Message relayMessage = iterator.next().getValue(); Map<String, Message> instanceMsgMap = _messageMap.get(instance); - - if (instanceMsgMap != null && instanceMsgMap.containsKey(message.getMsgId())) { - Message commitMessage = instanceMsgMap.get(message.getMsgId()); - - if (!commitMessage.isRelayMessage()) { - LOG.info( - "Controller already sent the message to the target host, remove relay message from the cache" - + message.getId()); + if (instanceMsgMap != null && instanceMsgMap.containsKey(relayMessage.getMsgId())) { + Message committedMessage = instanceMsgMap.get(relayMessage.getMsgId()); + if (committedMessage.isRelayMessage()) { + LOG.info("Relay message committed, remove relay message {} from the cache.", relayMessage + .getId()); iterator.remove(); - _committedRelayMessages.remove(message.getMsgId()); + _relayHostMessageCache.remove(relayMessage.getMsgId()); continue; } else { - // relay message has already been sent to target host - // remember when the relay messages get relayed to the target host. - if (!_committedRelayMessages.containsKey(message.getMsgId())) { - message.setRelayTime(System.currentTimeMillis()); - _committedRelayMessages.put(message.getMsgId(), message); - LOG.info("Put message into committed relay messages " + message.getId()); + // controller already sent the same message to target host, + // To avoid potential race-condition, do not remove relay message immediately, + // just set the relay time as current time . + if (relayMessage.getRelayTime() < 0) { + relayMessage.setRelayTime(System.currentTimeMillis()); + LOG.info( + "Controller already sent the message {} to the target host, set message to be relayed at {}", + relayMessage.getId(), relayMessage.getRelayTime()); } } } + String sessionId = relayMessage.getTgtSessionId(); + String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); + + // Target host's session has been changed, remove relay message if (!instanceSessionId.equals(sessionId)) { - LOG.info( - "Instance SessionId does not match, remove relay message from the cache" + message - .getId()); + LOG.info("Instance SessionId does not match, remove relay message {} from the cache.", relayMessage.getId()); iterator.remove(); + _relayHostMessageCache.remove(relayMessage.getMsgId()); continue; } + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = + currentStateMap.get(instance); + if (instanceCurrentStateMap == null) { + LOG.warn("CurrentStateMap null for " + instance); + continue; + } Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); if (sessionCurrentStateMap == null) { - LOG.info("No sessionCurrentStateMap found, ignore relay message from the cache" + message - .getId()); + LOG.warn("CurrentStateMap null for {}, session {}.", instance, sessionId); continue; } + String resourceName = relayMessage.getResourceName(); + String partitionName = relayMessage.getPartitionName(); + String targetState = relayMessage.getToState(); + String fromState = relayMessage.getFromState(); CurrentState currentState = sessionCurrentStateMap.get(resourceName); - if (currentState != null && targetState.equals(currentState.getState(partitionName))) { - LOG.info("CurrentState " + currentState - + " match the target state of the relay message, remove relay from cache." + message - .getId()); + + long currentTime = System.currentTimeMillis(); + if (currentState == null) { + if (relayMessage.getRelayTime() < 0) { + relayMessage.setRelayTime(currentTime); + LOG.warn("CurrentState is null for {} on {}, set relay time {} for message {}", + resourceName, instance, relayMessage.getRelayTime(), relayMessage.getId()); + } + } + + // if partition state on the target host already changed, set it to be expired. + String partitionState = currentState.getState(partitionName); + if (targetState.equals(partitionState) || !fromState.equals(partitionState)) { + if (relayMessage.getRelayTime() < 0) { + relayMessage.setRelayTime(currentTime); + LOG.debug( + "CurrentState {} on target host has changed, set relay time {} for message {}.", + partitionState, relayMessage.getRelayTime(), relayMessage.getId()); + } + } + + // derive relay time from hosted message and relayed host. + setRelayTime(relayMessage, liveInstanceMap, currentStateMap); + + if (relayMessage.isExpired()) { + LOG.info("relay message {} expired, remove it from cache. relay time {}.", + relayMessage.getId(), relayMessage.getRelayTime()); iterator.remove(); + _relayHostMessageCache.remove(relayMessage.getMsgId()); continue; } - if (message.isExpired()) { - LOG.error("relay message has not been sent " + message.getId() - + " expired, remove it from cache. relay time: " + message.getRelayTime()); + // If Helix missed all of other events to evict a relay message from the cache, + // it will delete the message anyway after a certain timeout. + // This is the latest resort to avoid a relay message stuck in the cache forever. + // This case should happen very rarely. + if (relayMessage.getRelayTime() < 0 + && (relayMessage.getCreateTimeStamp() + _relayMessageLifetime) < System + .currentTimeMillis()) { + LOG.info( + "relay message {} has reached its lifetime, remove it from cache.", relayMessage.getId()); iterator.remove(); + _relayHostMessageCache.remove(relayMessage.getMsgId()); continue; } + long expiryTime = relayMessage.getCreateTimeStamp() + _relayMessageLifetime; + if (relayMessage.getRelayTime() > 0) { + expiryTime = relayMessage.getRelayTime() + relayMessage.getExpiryPeriod(); + } + + if (expiryTime < nextRebalanceTime) { + nextRebalanceTime = expiryTime; + } + if (!relayMessageMap.containsKey(instance)) { relayMessageMap.put(instance, Maps.<String, Message>newHashMap()); } - relayMessageMap.get(instance).put(message.getMsgId(), message); + relayMessageMap.get(instance).put(relayMessage.getMsgId(), relayMessage); } } + if (nextRebalanceTime < Long.MAX_VALUE) { + scheduleFuturePipeline(nextRebalanceTime); + } + _relayMessageMap = Collections.unmodifiableMap(relayMessageMap); - // TODO: this is a workaround, remove this once the participants are all in 0.8.2, - checkCommittedRelayMessages(currentStateMap); + long relayCount = 0; + // Add valid relay message to the instance message map. + for (String instance : _relayMessageMap.keySet()) { + Map<String, Message> relayMessages = _relayMessageMap.get(instance); + if (!_messageMap.containsKey(instance)) { + _messageMap.put(instance, Maps.<String, Message>newHashMap()); + } + _messageMap.get(instance).putAll(relayMessages); + relayCount += relayMessages.size(); + } + if (LOG.isDebugEnabled()) { + if (relayCount > 0) { + LOG.debug("Relay message cache size " + relayCount); + } + } } - // TODO: this is a workaround, once the participants are all in 0.8.2, - private void checkCommittedRelayMessages(Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { - Iterator<Map.Entry<String, Message>> it = _committedRelayMessages.entrySet().iterator(); - while (it.hasNext()) { - Message message = it.next().getValue(); - - String resourceName = message.getResourceName(); - String partitionName = message.getPartitionName(); - String targetState = message.getToState(); - String instance = message.getTgtName(); - String sessionId = message.getTgtSessionId(); - - long committedTime = message.getRelayTime(); - if (committedTime + _commitMessageExpiry < System.currentTimeMillis()) { - LOG.info("relay message " + message.getMsgId() - + " is expired after committed, remove it from committed message cache."); - it.remove(); - continue; - } + // Schedule a future rebalance pipeline run. + private void scheduleFuturePipeline(long rebalanceTime) { + GenericHelixController controller = GenericHelixController.getController(_clusterName); + if (controller != null) { + controller.scheduleRebalance(rebalanceTime); + } else { + LOG.warn( + "Failed to schedule a future pipeline run for cluster {} at delay {}, helix controller is null.", + _clusterName, (rebalanceTime - System.currentTimeMillis())); + } + } - Map<String, Map<String, CurrentState>> instanceCurrentStateMap = - currentStateMap.get(instance); - if (instanceCurrentStateMap == null || instanceCurrentStateMap.get(sessionId) == null) { - LOG.info( - "No sessionCurrentStateMap found, remove it from committed message cache." + message - .getId()); - it.remove(); - continue; - } + private void setRelayTime(Message relayMessage, Map<String, LiveInstance> liveInstanceMap, + Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { + + // relay time already set, avoid to reset it to a later time. + if (relayMessage.getRelayTime() > relayMessage.getCreateTimeStamp()) { + return; + } + + Message hostedMessage = _relayHostMessageCache.get(relayMessage.getMsgId()); + String sessionId = hostedMessage.getTgtSessionId(); + String instance = hostedMessage.getTgtName(); + String resourceName = hostedMessage.getResourceName(); + String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); + + long currentTime = System.currentTimeMillis(); + long expiredTime = currentTime + relayMessage.getExpiryPeriod(); + + if (!instanceSessionId.equals(sessionId)) { + LOG.debug( + "Hosted Instance SessionId {} does not match sessionId {} in hosted message , set relay message {} to be expired at {}, hosted message ", + instanceSessionId, sessionId, relayMessage.getId(), expiredTime, + hostedMessage.getMsgId()); + relayMessage.setRelayTime(currentTime); + return; + } - Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); - CurrentState currentState = sessionCurrentStateMap.get(resourceName); - if (currentState != null && targetState.equals(currentState.getState(partitionName))) { - LOG.info("CurrentState " + currentState - + " match the target state of the relay message, remove it from committed message cache." - + message.getId()); - it.remove(); - continue; + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = currentStateMap.get(instance); + if (instanceCurrentStateMap == null) { + LOG.debug( + "No instanceCurrentStateMap found for {} on {}, set relay messages {} to be expired at {}" + + resourceName, instance, relayMessage.getId(), expiredTime); + relayMessage.setRelayTime(currentTime); + return; + } + + Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); + if (sessionCurrentStateMap == null) { + LOG.debug("No sessionCurrentStateMap found, set relay messages {} to be expired at {}. ", + relayMessage.getId(), expiredTime); + relayMessage.setRelayTime(currentTime); + return; + } + + String partitionName = hostedMessage.getPartitionName(); + String targetState = hostedMessage.getToState(); + String fromState = hostedMessage.getFromState(); + + CurrentState currentState = sessionCurrentStateMap.get(resourceName); + if (currentState == null) { + LOG.debug("No currentState found for {} on {}, set relay message {} to be expired at {} ", + resourceName, instance, relayMessage.getId(), + (currentTime + relayMessage.getExpiryPeriod())); + relayMessage.setRelayTime(currentTime); + return; + } + + if (targetState.equals(currentState.getState(partitionName))) { + long completeTime = currentState.getEndTime(partitionName); + if (completeTime < relayMessage.getCreateTimeStamp()) { + completeTime = currentTime; } + relayMessage.setRelayTime(completeTime); + LOG.debug( + "Target state match the hosted message's target state, set relay message {} relay time at {}.", + relayMessage.getId(), completeTime); + } - Map<String, Message> cachedMap = _messageMap.get(message.getTgtName()); - cachedMap.put(message.getId(), message); + if (!fromState.equals(currentState.getState(partitionName))) { + LOG.debug( + "Current state does not match hosted message's from state, set relay message {} relay time at {}.", + relayMessage.getId(), currentTime); + relayMessage.setRelayTime(currentTime); } } @@ -367,20 +437,22 @@ public class InstanceMessagesCache { if (message.hasRelayMessages()) { for (Message relayMsg : message.getRelayMessages().values()) { - cacheRelayMessage(relayMsg); + cacheRelayMessage(relayMsg, message); } } } } - protected void cacheRelayMessage(Message message) { - String instanceName = message.getTgtName(); + private void cacheRelayMessage(Message relayMessage, Message hostMessage) { + String instanceName = relayMessage.getTgtName(); if (!_relayMessageCache.containsKey(instanceName)) { _relayMessageCache.put(instanceName, Maps.<String, Message>newHashMap()); } - _relayMessageCache.get(instanceName).put(message.getId(), message); + _relayMessageCache.get(instanceName).put(relayMessage.getId(), relayMessage); + _relayHostMessageCache.put(relayMessage.getMsgId(), hostMessage); - LOG.info("Add message to relay cache " + message.getMsgId()); + LOG.info("Add relay message to relay cache " + relayMessage.getMsgId() + ", hosted message " + + hostMessage.getMsgId()); } @Override public String toString() { diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index dda4552..a8f2872 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -21,6 +21,7 @@ package org.apache.helix.controller; import com.google.common.collect.Sets; import java.util.ArrayList; +import java.util.Arrays; import java.util.Collections; import java.util.HashMap; import java.util.List; @@ -29,10 +30,12 @@ import java.util.Set; import java.util.Timer; import java.util.TimerTask; import java.util.UUID; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.ThreadFactory; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import org.I0Itec.zkclient.exception.ZkInterruptedException; import org.apache.helix.HelixDataAccessor; @@ -156,9 +159,16 @@ public class GenericHelixController implements IdealStateChangeListener, * one resource group has the config to use the timer. */ Timer _periodicalRebalanceTimer = null; - long _timerPeriod = Long.MAX_VALUE; + + /** + * The timer that triggers the on-demand rebalance pipeline. + */ + Timer _onDemandRebalanceTimer = null; + AtomicReference<RebalanceTask> _nextRebalanceTask = new AtomicReference<>(); + + /** * A cache maintained across pipelines */ @@ -174,6 +184,19 @@ public class GenericHelixController implements IdealStateChangeListener, private String _clusterName; private final Set<Pipeline.Type> _enabledPipelineTypes; + private HelixManager _helixManager; + + /** + * TODO: We should get rid of this once we move to: + * 1) ZK callback should go to ClusterDataCache and trigger data cache refresh only + * 2) then ClusterDataCache.refresh triggers rebalance pipeline. + */ + /* Map of cluster->GenrichelixController */ + private static Map<String, GenericHelixController> HelixControllerFactory = new ConcurrentHashMap<>(); + public static GenericHelixController getController(String clusterName) { + return HelixControllerFactory.get(clusterName); + } + /** * Default constructor that creates a default pipeline registry. This is sufficient in most cases, * but if there is a some thing specific needed use another constructor where in you can pass a @@ -198,16 +221,28 @@ public class GenericHelixController implements IdealStateChangeListener, class RebalanceTask extends TimerTask { final HelixManager _manager; final ClusterEventType _clusterEventType; + private long _nextRebalanceTime; public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType) { + this(manager, clusterEventType, -1); + + } + + public RebalanceTask(HelixManager manager, ClusterEventType clusterEventType, long nextRebalanceTime) { _manager = manager; _clusterEventType = clusterEventType; + _nextRebalanceTime = nextRebalanceTime; + } + + public long getNextRebalanceTime() { + return _nextRebalanceTime; } @Override public void run() { try { - if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance)) { + if (_clusterEventType.equals(ClusterEventType.PeriodicalRebalance) || _clusterEventType + .equals(ClusterEventType.OnDemandRebalance)) { requestDataProvidersFullRefresh(); HelixDataAccessor accessor = _manager.getHelixDataAccessor(); @@ -224,7 +259,6 @@ public class GenericHelixController implements IdealStateChangeListener, } } } - forceRebalance(_manager, _clusterEventType); } catch (Throwable ex) { logger.error("Time task failed. Rebalance task type: " + _clusterEventType + ", cluster: " @@ -247,7 +281,7 @@ public class GenericHelixController implements IdealStateChangeListener, enqueueEvent(_taskEventQueue, event); enqueueEvent(_eventQueue, event.clone(uid)); logger.info(String - .format("Controller rebalance event triggered with event type: %s for cluster %s", + .format("Controller rebalance pipeline triggered with event type: %s for cluster %s", eventType, _clusterName)); } @@ -284,6 +318,46 @@ public class GenericHelixController implements IdealStateChangeListener, } } + /** + * schedule a future rebalance pipeline run, delayed at given time. + */ + public void scheduleRebalance(long rebalanceTime) { + if (_helixManager == null) { + logger.warn( + "Failed to schedule a future pipeline run for cluster " + _clusterName + " helix manager is null!"); + return; + } + + long current = System.currentTimeMillis(); + long delay = rebalanceTime - current; + + if (rebalanceTime > current) { + if (_onDemandRebalanceTimer == null) { + _onDemandRebalanceTimer = new Timer(true); + } + + RebalanceTask preTask = _nextRebalanceTask.get(); + if (preTask != null && preTask.getNextRebalanceTime() > current + && preTask.getNextRebalanceTime() < rebalanceTime) { + // already have a earlier rebalance scheduled, no need to schedule again. + return; + } + + RebalanceTask newTask = + new RebalanceTask(_helixManager, ClusterEventType.OnDemandRebalance, rebalanceTime); + + _onDemandRebalanceTimer.schedule(newTask, delay); + logger.info( + "Scheduled a future pipeline run for cluster " + _helixManager.getClusterName() + " in delay " + + delay); + + preTask = _nextRebalanceTask.getAndSet(newTask); + if (preTask != null) { + preTask.cancel(); + } + } + } + private static PipelineRegistry createDefaultRegistry(String pipelineName) { logger.info("createDefaultRegistry"); synchronized (GenericHelixController.class) { @@ -335,6 +409,7 @@ public class GenericHelixController implements IdealStateChangeListener, registry.register(ClusterEventType.MessageChange, dataRefresh, dataPreprocess, rebalancePipeline); registry.register(ClusterEventType.Resume, dataRefresh, dataPreprocess, externalViewPipeline, rebalancePipeline); registry.register(ClusterEventType.PeriodicalRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); + registry.register(ClusterEventType.OnDemandRebalance, dataRefresh, autoExitMaintenancePipeline, dataPreprocess, externalViewPipeline, rebalancePipeline); return registry; } } @@ -442,6 +517,10 @@ public class GenericHelixController implements IdealStateChangeListener, _taskEventQueue = null; _taskEventThread = null; } + + if (clusterName != null) { + HelixControllerFactory.put(clusterName, this); + } } private void initializeAsyncFIFOWorkers() { @@ -493,6 +572,8 @@ public class GenericHelixController implements IdealStateChangeListener, return; } + _helixManager = manager; + // TODO If init controller with paused = true, it may not take effect immediately // _paused is default false. If any events come before controllerChangeEvent, the controller // will be excuting in un-paused mode. Which might not be the config in ZK. @@ -540,7 +621,7 @@ public class GenericHelixController implements IdealStateChangeListener, .getPipelinesForEvent(event.getEventType()); isTaskFrameworkPipeline = true; } else { - logger.info(String + logger.warn(String .format("No %s pipeline to run for event: %s::%s", dataProvider.getPipelineName(), event.getEventType(), event.getEventId())); return; @@ -561,7 +642,7 @@ public class GenericHelixController implements IdealStateChangeListener, } catch (Exception e) { logger.error( "Exception while executing {} pipeline: {} for cluster {}. Will not continue to next pipeline", - dataProvider.getPipelineName(), _clusterName, e); + dataProvider.getPipelineName(), _clusterName, Arrays.toString(e.getStackTrace())); if (e instanceof HelixMetaDataAccessException) { rebalanceFail = true; diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index 84013ba..2ab4526 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -108,6 +108,7 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { public synchronized void refresh(HelixDataAccessor accessor) { long startTime = System.currentTimeMillis(); + // Invalidate cached information if (_propertyDataChangedMap.get(HelixConstants.ChangeType.IDEAL_STATE) || _propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE) @@ -123,8 +124,8 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { refreshExternalViews(accessor); refreshTargetExternalViews(accessor); LogUtil.logInfo(logger, getClusterEventId(), String.format( - "END: WorkflowControllerDataProvider.refresh() for cluster %s took %s for %s pipeline", - getClusterName(), System.currentTimeMillis() - startTime, getPipelineName())); + "END: ResourceControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline", + getClusterName(), startTime, System.currentTimeMillis() - startTime, getPipelineName())); dumpDebugInfo(); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java index 3416e22..2eee09e 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/WorkflowControllerDataProvider.java @@ -104,8 +104,8 @@ public class WorkflowControllerDataProvider extends BaseControllerDataProvider { long duration = System.currentTimeMillis() - startTime; LogUtil.logInfo(logger, getClusterEventId(), String.format( - "END: WorkflowControllerDataProvider.refresh() for cluster %s took %s for %s pipeline", - getClusterName(), duration, getPipelineName())); + "END: WorkflowControllerDataProvider.refresh() for cluster %s, started at %d took %d for %s pipeline", + getClusterName(), startTime, duration, getPipelineName())); dumpDebugInfo(); } diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java index a65e9f0..b09c297 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEvent.java @@ -105,7 +105,7 @@ public class ClusterEvent { @Override public String toString() { StringBuilder sb = new StringBuilder(); - sb.append(String.format("Event id : %s", _eventId.toString())); + sb.append(String.format("Event id : %s", _eventId)); sb.append("name:" + _eventType.name()).append("\n"); for (String key : _eventAttributeMap.keySet()) { sb.append(key).append(":").append(_eventAttributeMap.get(key)).append("\n"); diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java index 5312c35..7e9ab15 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterEventType.java @@ -32,6 +32,7 @@ public enum ClusterEventType { TargetExternalViewChange, Resume, PeriodicalRebalance, + OnDemandRebalance, RetryRebalance, StateVerifier, Unknown diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java index d266c7d..2894a49 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/MessageGenerationPhase.java @@ -207,21 +207,21 @@ public abstract class MessageGenerationPhase extends AbstractBaseStage { if (pendingMessage != null) { String pendingState = pendingMessage.getToState(); if (nextState.equalsIgnoreCase(pendingState)) { - LogUtil.logDebug(logger, _eventId, + LogUtil.logInfo(logger, _eventId, "Message already exists for " + instanceName + " to transit " + resource .getResourceName() + "." + partition.getPartitionName() + " from " - + currentState + " to " + nextState); + + currentState + " to " + nextState + ", isRelay: " + pendingMessage.isRelayMessage()); } else if (currentState.equalsIgnoreCase(pendingState)) { LogUtil.logInfo(logger, _eventId, "Message hasn't been removed for " + instanceName + " to transit " + resource .getResourceName() + "." + partition.getPartitionName() + " to " - + pendingState + ", desiredState: " + desiredState); + + pendingState + ", desiredState: " + desiredState + ", isRelay: " + pendingMessage.isRelayMessage()); } else { LogUtil.logInfo(logger, _eventId, "IdealState changed before state transition completes for " + resource .getResourceName() + "." + partition.getPartitionName() + " on " + instanceName + ", pendingState: " + pendingState + ", currentState: " - + currentState + ", nextState: " + nextState); + + currentState + ", nextState: " + nextState + ", isRelay: " + pendingMessage.isRelayMessage()); message = createStateTransitionCancellationMessage(manager, resource, partition.getPartitionName(), instanceName, sessionIdMap.get(instanceName), diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 898bd63..77cbcca 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -196,6 +196,7 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { _dataProvider.requireFullRefresh(); _dataProvider.refresh(_accessor); + _dataProvider.setClusterEventId("ClusterStateVerifier"); Map<String, IdealState> idealStates = new HashMap<>(_dataProvider.getIdealStates()); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java index daf24fb..cde62ae 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestEnableCompression.java @@ -110,7 +110,7 @@ public class TestEnableCompression extends ZkTestBase { Assert.assertTrue(result); List<String> compressedPaths = new ArrayList<String>(); - findCompressedZNodes(zkClient, "/", compressedPaths); + findCompressedZNodes(zkClient, "/" + clusterName, compressedPaths); System.out.println("compressed paths:" + compressedPaths); // ONLY IDEALSTATE and EXTERNAL VIEW must be compressed diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java index 92dfa13..9dad564 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PMessageSemiAuto.java @@ -186,7 +186,7 @@ public class TestP2PMessageSemiAuto extends ZkTestBase { } private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost) { - verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 0.7); + verifyP2PMessage(dbName, instance, expectedState, expectedTriggerHost, 0.6); } private void verifyP2PMessage(String dbName, String instance, String expectedState, String expectedTriggerHost, double expectedRatio) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java new file mode 100644 index 0000000..a1b37b2 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestP2PSingleTopState.java @@ -0,0 +1,214 @@ +package org.apache.helix.integration.messaging; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.Date; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicLong; +import org.apache.helix.ConfigAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.NotificationContext; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.rebalancer.strategy.CrushEdRebalanceStrategy; +import org.apache.helix.integration.DelayedTransitionBase; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.mock.participant.MockTransition; +import org.apache.helix.model.BuiltInStateModelDefinitions; +import org.apache.helix.model.MasterSlaveSMD; +import org.apache.helix.model.Message; +import org.apache.helix.tools.ClusterVerifiers.BestPossibleExternalViewVerifier; +import org.apache.helix.tools.ClusterVerifiers.ZkHelixClusterVerifier; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +public class TestP2PSingleTopState extends ZkTestBase { + private static Logger logger = LoggerFactory.getLogger(TestP2PSingleTopState.class); + + final String CLASS_NAME = getShortClassName(); + final String CLUSTER_NAME = CLUSTER_PREFIX + "_" + CLASS_NAME; + + static final int PARTICIPANT_NUMBER = 24; + static final int PARTICIPANT_START_PORT = 12918; + + static final int DB_COUNT = 2; + + static final int PARTITION_NUMBER = 50; + static final int REPLICA_NUMBER = 3; + + final String _controllerName = CONTROLLER_PREFIX + "_0"; + + List<MockParticipantManager> _participants = new ArrayList<>(); + List<String> _instances = new ArrayList<>(); + ClusterControllerManager _controller; + + ZkHelixClusterVerifier _clusterVerifier; + ConfigAccessor _configAccessor; + HelixDataAccessor _accessor; + + @BeforeClass + public void beforeClass() { + System.out + .println("START " + getShortClassName() + " at " + new Date(System.currentTimeMillis())); + + // setup storage cluster + _gSetupTool.addCluster(CLUSTER_NAME, true); + + for (int i = 0; i < PARTICIPANT_NUMBER / 2; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + } + + // start dummy participants + for (int i = 0; i < PARTICIPANT_NUMBER / 2; i++) { + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.setTransition(new DelayedTransitionBase(100)); + participant.syncStart(); + participant.setTransition(new TestTransition(participant.getInstanceName())); + _participants.add(participant); + } + + _configAccessor = new ConfigAccessor(_gZkClient); + _accessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + + enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, true, 1000000); + //enableDelayRebalanceInCluster(_gZkClient, CLUSTER_NAME, false); + enablePersistBestPossibleAssignment(_gZkClient, CLUSTER_NAME, true); + enableP2PInCluster(CLUSTER_NAME, _configAccessor, true); + + // start controller + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, _controllerName); + _controller.syncStart(); + + for (int i = 0; i < DB_COUNT; i++) { + createResourceWithDelayedRebalance(CLUSTER_NAME, "TestDB_" + i, + BuiltInStateModelDefinitions.MasterSlave.name(), PARTITION_NUMBER, REPLICA_NUMBER, + REPLICA_NUMBER - 1, 1000000L, CrushEdRebalanceStrategy.class.getName()); + } + + _clusterVerifier = + new BestPossibleExternalViewVerifier.Builder(CLUSTER_NAME).setZkClient(_gZkClient).build(); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + @AfterClass + public void afterClass() throws Exception { + _controller.syncStop(); + for (MockParticipantManager p : _participants) { + if (p.isConnected()) { + p.syncStop(); + } + } + deleteCluster(CLUSTER_NAME); + System.out.println("END " + CLASS_NAME + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testRollingUpgrade() throws InterruptedException { + // rolling upgrade the cluster + for (String ins : _instances) { + System.out.println("Disable " + ins); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, false); + Thread.sleep (1000); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + System.out.println("Enable " + ins); + _gSetupTool.getClusterManagementTool().enableInstance(CLUSTER_NAME, ins, true); + Thread.sleep(1000); + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Assert.assertFalse(TestTransition.duplicatedPartitionsSnapshot.keys().hasMoreElements()); + } + + @Test + public void testAddInstances() throws InterruptedException { + for (int i = PARTICIPANT_NUMBER / 2; i < PARTICIPANT_NUMBER; i++) { + String instance = PARTICIPANT_PREFIX + "_" + (PARTICIPANT_START_PORT + i); + _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instance); + _instances.add(instance); + MockParticipantManager participant = + new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, _instances.get(i)); + participant.setTransition(new DelayedTransitionBase(100)); + participant.syncStart(); + participant.setTransition(new TestTransition(participant.getInstanceName())); + _participants.add(participant); + Thread.sleep(100); + } + + Assert.assertTrue(_clusterVerifier.verifyByPolling()); + Assert.assertFalse(TestTransition.duplicatedPartitionsSnapshot.keys().hasMoreElements()); + } + + static class TestTransition extends MockTransition { + // resource.partition that having top state. + static ConcurrentHashMap<String, Map<String, String>> duplicatedPartitionsSnapshot = + new ConcurrentHashMap<>(); + static ConcurrentHashMap<String, Map<String, String>> ExternalViews = new ConcurrentHashMap<>(); + static AtomicLong totalToMaster = new AtomicLong(); + static AtomicLong totalRelayMessage = new AtomicLong(); + + String _instanceName; + + public TestTransition(String instanceName) { + _instanceName = instanceName; + } + + public void doTransition(Message message, NotificationContext context) { + String to = message.getToState(); + String resource = message.getResourceName(); + String partition = message.getPartitionName(); + String key = resource + "." + partition; + + if (to.equals(MasterSlaveSMD.States.MASTER.name())) { + Map<String, String> mapFields = new HashMap<>(ExternalViews.get(key)); + if (mapFields.values().contains(MasterSlaveSMD.States.MASTER.name())) { + Map<String, String> newMapFile = new HashMap<>(mapFields); + newMapFile.put(_instanceName, to); + duplicatedPartitionsSnapshot.put(key, newMapFile); + } + + totalToMaster.incrementAndGet(); + if (message.isRelayMessage()) { + totalRelayMessage.incrementAndGet(); + } + } + + ExternalViews.putIfAbsent(key, new ConcurrentHashMap<String, String>()); + if (to.equalsIgnoreCase("DROPPED")) { + ExternalViews.get(key).remove(_instanceName); + } else { + ExternalViews.get(key).put(_instanceName, to); + } + } + } +} + diff --git a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java index 983a6a3..5fcb307 100644 --- a/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java +++ b/helix-core/src/test/java/org/apache/helix/participant/MockZKHelixManager.java @@ -51,6 +51,8 @@ import org.apache.helix.manager.zk.client.HelixZkClient; import org.apache.helix.messaging.DefaultMessagingService; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.store.zk.ZkHelixPropertyStore; +import org.apache.helix.task.TaskConstants; +import org.testng.collections.Lists; public class MockZKHelixManager implements HelixManager { private final ZKHelixDataAccessor _accessor; @@ -281,7 +283,8 @@ public class MockZKHelixManager implements HelixManager { @Override public ZkHelixPropertyStore<ZNRecord> getHelixPropertyStore() { // TODO Auto-generated method stub - return null; + return new ZkHelixPropertyStore<>( + (ZkBaseDataAccessor<ZNRecord>) _accessor.getBaseDataAccessor(), TaskConstants.REBALANCER_CONTEXT_ROOT, Lists.<String>newArrayList()); } @Override
