Refactor ClusterDataCache, break it into small cache components, including CurrentStateCache, InstanceMessageCache and TaskDataCache, and put the refresh logic into each cache component itself.
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/ae02b58d Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/ae02b58d Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/ae02b58d Branch: refs/heads/master Commit: ae02b58dc4880fe1f63bc66e0fdf93a63fd1d03f Parents: 0e84903 Author: Lei Xia <[email protected]> Authored: Mon Feb 5 17:26:24 2018 -0800 Committer: Lei Xia <[email protected]> Committed: Tue Mar 20 11:31:47 2018 -0700 ---------------------------------------------------------------------- .../helix/common/BasicClusterDataCache.java | 180 --------- .../common/caches/BasicClusterDataCache.java | 199 +++++++++ .../helix/common/caches/CurrentStateCache.java | 206 ++++++++++ .../common/caches/InstanceMessagesCache.java | 222 ++++++++++ .../helix/common/caches/TaskDataCache.java | 232 +++++++++++ .../controller/stages/ClusterDataCache.java | 402 +++---------------- .../helix/spectator/RoutingDataCache.java | 2 +- 7 files changed, 912 insertions(+), 531 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java deleted file mode 100644 index d48cfbb..0000000 --- a/helix-core/src/main/java/org/apache/helix/common/BasicClusterDataCache.java +++ /dev/null @@ -1,180 +0,0 @@ -package org.apache.helix.common; - -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ - -import java.util.Collections; -import java.util.HashMap; -import java.util.Map; -import java.util.concurrent.ConcurrentHashMap; -import org.apache.helix.HelixConstants; -import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixException; -import org.apache.helix.PropertyKey; -import org.apache.helix.PropertyType; -import org.apache.helix.model.ExternalView; -import org.apache.helix.model.InstanceConfig; -import org.apache.helix.model.LiveInstance; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - -/** - * Cache the cluster data - */ -public class BasicClusterDataCache { - protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); - - private Map<String, LiveInstance> _liveInstanceMap; - private Map<String, InstanceConfig> _instanceConfigMap; - private Map<String, ExternalView> _externalViewMap; - private final PropertyType _sourceDataType; - - protected String _clusterName; - - protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap; - - public BasicClusterDataCache(String clusterName) { - this(clusterName, PropertyType.EXTERNALVIEW); - } - - public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) { - _propertyDataChangedMap = new ConcurrentHashMap<>(); - _liveInstanceMap = new HashMap<>(); - _instanceConfigMap = new HashMap<>(); - _externalViewMap = new HashMap<>(); - _clusterName = clusterName; - _sourceDataType = sourceDataType; - } - - /** - * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way - * - * @param accessor - * - * @return - */ - public synchronized void refresh(HelixDataAccessor accessor) { - LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName); - long startTime = System.currentTimeMillis(); - PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - - if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) { - long start = System.currentTimeMillis(); - _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false)); - switch (_sourceDataType) { - case EXTERNALVIEW: - _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews()); - break; - case TARGETEXTERNALVIEW: - _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews()); - break; - default: - break; - } - if (LOG.isDebugEnabled()) { - LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + ( - System.currentTimeMillis() - start) + " ms"); - } - } - - if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) { - _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); - _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); - LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet()); - } - - if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) { - _propertyDataChangedMap - .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); - _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); - LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet()); - } - - long endTime = System.currentTimeMillis(); - LOG.info( - "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime - - startTime) + " ms"); - - if (LOG.isDebugEnabled()) { - LOG.debug("LiveInstances: " + _liveInstanceMap.keySet()); - for (LiveInstance instance : _liveInstanceMap.values()) { - LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); - } - LOG.debug("ExternalViews: " + _externalViewMap.keySet()); - LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); - } - } - - /** - * Retrieves the ExternalView for all resources - * - * @return - */ - public Map<String, ExternalView> getExternalViews() { - return Collections.unmodifiableMap(_externalViewMap); - } - - /** - * Returns the LiveInstances for each of the instances that are curretnly up and running - * - * @return - */ - public Map<String, LiveInstance> getLiveInstances() { - return Collections.unmodifiableMap(_liveInstanceMap); - } - - /** - * Returns the instance config map - * - * @return - */ - public Map<String, InstanceConfig> getInstanceConfigMap() { - return Collections.unmodifiableMap(_instanceConfigMap); - } - - /** - * Notify the cache that some part of the cluster data has been changed. - */ - public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) { - _propertyDataChangedMap.put(changeType, Boolean.valueOf(true)); - } - - /** - * Indicate that a full read should be done on the next refresh - */ - public synchronized void requireFullRefresh() { - for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) { - _propertyDataChangedMap.put(type, Boolean.valueOf(true)); - } - } - - /** - * toString method to print the data cache state - */ - @Override - public String toString() { - StringBuilder sb = new StringBuilder(); - sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); - sb.append("externalViewMap:" + _externalViewMap).append("\n"); - sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); - - return sb.toString(); - } -} - http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java new file mode 100644 index 0000000..f470272 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/BasicClusterDataCache.java @@ -0,0 +1,199 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.concurrent.ConcurrentHashMap; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.PropertyType; +import org.apache.helix.model.ExternalView; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache the basic cluster data, including LiveInstances, InstanceConfigs and ExternalViews. + */ +public class BasicClusterDataCache { + protected final Logger LOG = LoggerFactory.getLogger(this.getClass().getName()); + + private Map<String, LiveInstance> _liveInstanceMap; + private Map<String, InstanceConfig> _instanceConfigMap; + private Map<String, ExternalView> _externalViewMap; + private final PropertyType _sourceDataType; + + protected String _clusterName; + + protected Map<HelixConstants.ChangeType, Boolean> _propertyDataChangedMap; + + public BasicClusterDataCache(String clusterName) { + this(clusterName, PropertyType.EXTERNALVIEW); + } + + public BasicClusterDataCache(String clusterName, PropertyType sourceDataType) { + _propertyDataChangedMap = new ConcurrentHashMap<>(); + _liveInstanceMap = new HashMap<>(); + _instanceConfigMap = new HashMap<>(); + _externalViewMap = new HashMap<>(); + _clusterName = clusterName; + _sourceDataType = sourceDataType; + } + + /** + * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way + * + * @param accessor + * + * @return + */ + public synchronized void refresh(HelixDataAccessor accessor) { + LOG.info("START: ClusterDataCache.refresh() for cluster " + _clusterName); + long startTime = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.EXTERNAL_VIEW)) { + long start = System.currentTimeMillis(); + _propertyDataChangedMap.put(HelixConstants.ChangeType.EXTERNAL_VIEW, Boolean.valueOf(false)); + switch (_sourceDataType) { + case EXTERNALVIEW: + _externalViewMap = accessor.getChildValuesMap(keyBuilder.externalViews()); + break; + case TARGETEXTERNALVIEW: + _externalViewMap = accessor.getChildValuesMap(keyBuilder.targetExternalViews()); + break; + default: + break; + } + if (LOG.isDebugEnabled()) { + LOG.debug("Reload ExternalViews: " + _externalViewMap.keySet() + ". Takes " + ( + System.currentTimeMillis() - start) + " ms"); + } + } + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.LIVE_INSTANCE)) { + _propertyDataChangedMap.put(HelixConstants.ChangeType.LIVE_INSTANCE, Boolean.valueOf(false)); + _liveInstanceMap = accessor.getChildValuesMap(keyBuilder.liveInstances()); + LOG.debug("Reload LiveInstances: " + _liveInstanceMap.keySet()); + } + + if (_propertyDataChangedMap.get(HelixConstants.ChangeType.INSTANCE_CONFIG)) { + _propertyDataChangedMap + .put(HelixConstants.ChangeType.INSTANCE_CONFIG, Boolean.valueOf(false)); + _instanceConfigMap = accessor.getChildValuesMap(keyBuilder.instanceConfigs()); + LOG.debug("Reload InstanceConfig: " + _instanceConfigMap.keySet()); + } + + long endTime = System.currentTimeMillis(); + LOG.info( + "END: RoutingDataCache.refresh() for cluster " + _clusterName + ", took " + (endTime + - startTime) + " ms"); + + if (LOG.isDebugEnabled()) { + LOG.debug("LiveInstances: " + _liveInstanceMap.keySet()); + for (LiveInstance instance : _liveInstanceMap.values()) { + LOG.debug("live instance: " + instance.getInstanceName() + " " + instance.getSessionId()); + } + LOG.debug("ExternalViews: " + _externalViewMap.keySet()); + LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); + } + } + + /** + * Retrieves the ExternalView for all resources + * + * @return + */ + public Map<String, ExternalView> getExternalViews() { + return Collections.unmodifiableMap(_externalViewMap); + } + + /** + * Returns the LiveInstances for each of the instances that are curretnly up and running + * + * @return + */ + public Map<String, LiveInstance> getLiveInstances() { + return Collections.unmodifiableMap(_liveInstanceMap); + } + + /** + * Returns the instance config map + * + * @return + */ + public Map<String, InstanceConfig> getInstanceConfigMap() { + return Collections.unmodifiableMap(_instanceConfigMap); + } + + /** + * Notify the cache that some part of the cluster data has been changed. + */ + public synchronized void notifyDataChange(HelixConstants.ChangeType changeType) { + _propertyDataChangedMap.put(changeType, Boolean.valueOf(true)); + } + + /** + * Clear the corresponding cache based on change type + * @param changeType + */ + public synchronized void clearCache(HelixConstants.ChangeType changeType) { + switch (changeType) { + case LIVE_INSTANCE: + _liveInstanceMap.clear(); + break; + case INSTANCE_CONFIG: + _instanceConfigMap.clear(); + break; + case EXTERNAL_VIEW: + _externalViewMap.clear(); + break; + default: + break; + } + } + + /** + * Indicate that a full read should be done on the next refresh + */ + public synchronized void requireFullRefresh() { + for(HelixConstants.ChangeType type : HelixConstants.ChangeType.values()) { + _propertyDataChangedMap.put(type, Boolean.valueOf(true)); + } + } + + /** + * toString method to print the data cache state + */ + @Override + public String toString() { + StringBuilder sb = new StringBuilder(); + sb.append("liveInstaceMap:" + _liveInstanceMap).append("\n"); + sb.append("externalViewMap:" + _externalViewMap).append("\n"); + sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); + + return sb.toString(); + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java new file mode 100644 index 0000000..d921512 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/CurrentStateCache.java @@ -0,0 +1,206 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.HelixProperty; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache to hold all CurrentStates of a cluster. + */ +public class CurrentStateCache { + private static final Logger LOG = LoggerFactory.getLogger(CurrentStateCache.class.getName()); + + private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap; + private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap(); + + private String _clusterName; + + public CurrentStateCache(String clusterName) { + _clusterName = clusterName; + _currentStateMap = Collections.emptyMap(); + } + + /** + * This refreshes the CurrentStates data by re-fetching the data from zookeeper in an efficient + * way + * + * @param accessor + * @param liveInstanceMap map of all liveInstances in cluster + * + * @return + */ + public synchronized boolean refresh(HelixDataAccessor accessor, + Map<String, LiveInstance> liveInstanceMap) { + LOG.info("START: CurrentStateCache.refresh()"); + long startTime = System.currentTimeMillis(); + + refreshCurrentStatesCache(accessor, liveInstanceMap); + + Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>(); + for (PropertyKey key : _currentStateCache.keySet()) { + CurrentState currentState = _currentStateCache.get(key); + String[] params = key.getParams(); + if (currentState != null && params.length >= 4) { + String instanceName = params[1]; + String sessionId = params[2]; + String stateName = params[3]; + Map<String, Map<String, CurrentState>> instanceCurStateMap = + allCurStateMap.get(instanceName); + if (instanceCurStateMap == null) { + instanceCurStateMap = Maps.newHashMap(); + allCurStateMap.put(instanceName, instanceCurStateMap); + } + Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId); + if (sessionCurStateMap == null) { + sessionCurStateMap = Maps.newHashMap(); + instanceCurStateMap.put(sessionId, sessionCurStateMap); + } + sessionCurStateMap.put(stateName, currentState); + } + } + + for (String instance : allCurStateMap.keySet()) { + allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance))); + } + _currentStateMap = Collections.unmodifiableMap(allCurStateMap); + + long endTime = System.currentTimeMillis(); + LOG.info("END: CurrentStateCache.refresh() for cluster " + _clusterName + ", took " + (endTime + - startTime) + " ms"); + return true; + } + + // reload current states that has been changed from zk to local cache. + private void refreshCurrentStatesCache(HelixDataAccessor accessor, + Map<String, LiveInstance> liveInstanceMap) { + long start = System.currentTimeMillis(); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + + List<PropertyKey> currentStateKeys = Lists.newLinkedList(); + for (String instanceName : liveInstanceMap.keySet()) { + LiveInstance liveInstance = liveInstanceMap.get(instanceName); + String sessionId = liveInstance.getSessionId(); + List<String> currentStateNames = + accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); + for (String currentStateName : currentStateNames) { + currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName)); + } + } + + // All new entries from zk not cached locally yet should be read from ZK. + List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys); + reloadKeys.removeAll(_currentStateCache.keySet()); + + List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet()); + cachedKeys.retainAll(currentStateKeys); + + List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys); + Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap(); + for (int i = 0; i < cachedKeys.size(); i++) { + PropertyKey key = cachedKeys.get(i); + HelixProperty.Stat stat = stats.get(i); + if (stat != null) { + CurrentState property = _currentStateCache.get(key); + if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { + currentStatesMap.put(key, property); + } else { + // need update from zk + reloadKeys.add(key); + } + } else { + LOG.debug("stat is null for key: " + key); + reloadKeys.add(key); + } + } + + List<CurrentState> currentStates = accessor.getProperty(reloadKeys); + Iterator<PropertyKey> csKeyIter = reloadKeys.iterator(); + for (CurrentState currentState : currentStates) { + PropertyKey key = csKeyIter.next(); + if (currentState != null) { + currentStatesMap.put(key, currentState); + } else { + LOG.debug("CurrentState null for key: " + key); + } + } + + _currentStateCache = Collections.unmodifiableMap(currentStatesMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size()); + LOG.debug("# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size() + - reloadKeys.size())); + } + LOG.info("Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!"); + } + + /** + * Return CurrentStates map for all instances. + * + * @return + */ + public Map<String, Map<String, Map<String, CurrentState>>> getCurrentStatesMap() { + return Collections.unmodifiableMap(_currentStateMap); + } + + /** + * Return all CurrentState on the given instance. + * + * @param instance + * + * @return + */ + public Map<String, Map<String, CurrentState>> getCurrentStates(String instance) { + if (!_currentStateMap.containsKey(instance)) { + return Collections.emptyMap(); + } + return Collections.unmodifiableMap(_currentStateMap.get(instance)); + } + + /** + * Provides the current state of the node for a given session id, the sessionid can be got from + * LiveInstance + * + * @param instance + * @param clientSessionId + * + * @return + */ + public Map<String, CurrentState> getCurrentState(String instance, String clientSessionId) { + if (!_currentStateMap.containsKey(instance) || !_currentStateMap.get(instance) + .containsKey(clientSessionId)) { + return Collections.emptyMap(); + } + return Collections.unmodifiableMap(_currentStateMap.get(instance).get(clientSessionId)); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java new file mode 100644 index 0000000..9ac40c3 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/InstanceMessagesCache.java @@ -0,0 +1,222 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Lists; +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.model.CurrentState; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache for holding pending messages in all instances in the given cluster. + */ +public class InstanceMessagesCache { + private static final Logger LOG = LoggerFactory.getLogger(InstanceMessagesCache.class.getName()); + private Map<String, Map<String, Message>> _messageMap; + + // maintain a cache of participant messages across pipeline runs + private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); + private String _clusterName; + + public InstanceMessagesCache(String clusterName) { + _clusterName = clusterName; + } + + /** + * This refreshes all pending messages in the cluster by re-fetching the data from zookeeper in an + * efficient way + * current state must be refreshed before refreshing relay messages because we need to use current + * state to validate all relay messages. + * + * @param accessor + * @param liveInstanceMap + * + * @return + */ + public synchronized boolean refresh(HelixDataAccessor accessor, + Map<String, LiveInstance> liveInstanceMap) { + LOG.info("START: InstanceMessagesCache.refresh()"); + long startTime = System.currentTimeMillis(); + + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + Map<String, Map<String, Message>> msgMap = new HashMap<>(); + List<PropertyKey> newMessageKeys = Lists.newLinkedList(); + long purgeSum = 0; + for (String instanceName : liveInstanceMap.keySet()) { + // get the cache + Map<String, Message> cachedMap = _messageCache.get(instanceName); + if (cachedMap == null) { + cachedMap = Maps.newHashMap(); + _messageCache.put(instanceName, cachedMap); + } + msgMap.put(instanceName, cachedMap); + + // get the current names + Set<String> messageNames = + Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName))); + + long purgeStart = System.currentTimeMillis(); + // clear stale names + Iterator<String> cachedNamesIter = cachedMap.keySet().iterator(); + while (cachedNamesIter.hasNext()) { + String messageName = cachedNamesIter.next(); + if (!messageNames.contains(messageName)) { + cachedNamesIter.remove(); + } + } + long purgeEnd = System.currentTimeMillis(); + purgeSum += purgeEnd - purgeStart; + + // get the keys for the new messages + for (String messageName : messageNames) { + if (!cachedMap.containsKey(messageName)) { + newMessageKeys.add(keyBuilder.message(instanceName, messageName)); + } + } + } + + // get the new messages + if (newMessageKeys.size() > 0) { + List<Message> newMessages = accessor.getProperty(newMessageKeys); + for (Message message : newMessages) { + if (message != null) { + Map<String, Message> cachedMap = _messageCache.get(message.getTgtName()); + cachedMap.put(message.getId(), message); + } + } + } + + _messageMap = Collections.unmodifiableMap(msgMap); + + if (LOG.isDebugEnabled()) { + LOG.debug("Message purge took: " + purgeSum); + LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + ( + System.currentTimeMillis() - startTime) + " ms."); + } + + return true; + } + + // update all valid relay messages attached to existing state transition messages into message map. + public void updateRelayMessages(Map<String, LiveInstance> liveInstanceMap, + Map<String, Map<String, Map<String, CurrentState>>> currentStateMap) { + List<Message> relayMessages = new ArrayList<>(); + for (String instance : _messageMap.keySet()) { + Map<String, Message> instanceMessages = _messageMap.get(instance); + Map<String, Map<String, CurrentState>> instanceCurrentStateMap = + currentStateMap.get(instance); + if (instanceCurrentStateMap == null) { + continue; + } + + for (Message message : instanceMessages.values()) { + if (message.hasRelayMessages()) { + String sessionId = message.getTgtSessionId(); + String resourceName = message.getResourceName(); + String partitionName = message.getPartitionName(); + String targetState = message.getToState(); + String instanceSessionId = liveInstanceMap.get(instance).getSessionId(); + + if (!instanceSessionId.equals(sessionId)) { + continue; + } + + Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); + if (sessionCurrentStateMap == null) { + continue; + } + CurrentState currentState = sessionCurrentStateMap.get(resourceName); + if (currentState == null || !targetState.equals(currentState.getState(partitionName))) { + continue; + } + long transitionCompleteTime = currentState.getEndTime(partitionName); + + for (Message msg : message.getRelayMessages().values()) { + msg.setRelayTime(transitionCompleteTime); + if (!message.isExpired()) { + relayMessages.add(msg); + } + } + } + } + } + + for (Message message : relayMessages) { + String instance = message.getTgtName(); + Map<String, Message> instanceMessages = _messageMap.get(instance); + if (instanceMessages == null) { + instanceMessages = new HashMap<>(); + _messageMap.put(instance, instanceMessages); + } + instanceMessages.put(message.getId(), message); + } + } + + /** + * Provides a list of current outstanding transitions on a given instance. + * + * @param instanceName + * + * @return + */ + public Map<String, Message> getMessages(String instanceName) { + Map<String, Message> map = _messageMap.get(instanceName); + if (map != null) { + return map; + } else { + return Collections.emptyMap(); + } + } + + public void cacheMessages(List<Message> messages) { + for (Message message : messages) { + String instanceName = message.getTgtName(); + Map<String, Message> instMsgMap; + if (_messageCache.containsKey(instanceName)) { + instMsgMap = _messageCache.get(instanceName); + } else { + instMsgMap = Maps.newHashMap(); + _messageCache.put(instanceName, instMsgMap); + } + instMsgMap.put(message.getId(), message); + } + } + + @Override public String toString() { + return "InstanceMessagesCache{" + + "_messageMap=" + _messageMap + + ", _messageCache=" + _messageCache + + ", _clusterName='" + _clusterName + '\'' + + '}'; + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java new file mode 100644 index 0000000..2dbb4f8 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -0,0 +1,232 @@ +package org.apache.helix.common.caches; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.base.Joiner; +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyType; +import org.apache.helix.ZNRecord; +import org.apache.helix.model.ResourceConfig; +import org.apache.helix.task.JobConfig; +import org.apache.helix.task.JobContext; +import org.apache.helix.task.TaskConstants; +import org.apache.helix.task.WorkflowConfig; +import org.apache.helix.task.WorkflowContext; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +/** + * Cache for holding all task related cluster data, such as WorkflowConfig, JobConfig and Contexts. + */ +public class TaskDataCache { + private static final Logger LOG = LoggerFactory.getLogger(TaskDataCache.class.getName()); + private static final String NAME = "NAME"; + + private Map<String, JobConfig> _jobConfigMap = new HashMap<>(); + private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>(); + private Map<String, ZNRecord> _contextMap = new HashMap<>(); + + private String _clusterName; + + public TaskDataCache(String clusterName) { + _clusterName = clusterName; + } + + /** + * This refreshes the cluster data by re-fetching the data from zookeeper in an efficient way + * + * @param accessor + * + * @return + */ + public synchronized boolean refresh(HelixDataAccessor accessor, + Map<String, ResourceConfig> resourceConfigMap) { + refreshJobContexts(accessor); + + // update workflow and job configs. + _workflowConfigMap.clear(); + _jobConfigMap.clear(); + for (Map.Entry<String, ResourceConfig> entry : resourceConfigMap.entrySet()) { + if (entry.getValue().getRecord().getSimpleFields() + .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) { + _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue())); + } else if (entry.getValue().getRecord().getSimpleFields() + .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) { + _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue())); + } + } + + return true; + } + + private void refreshJobContexts(HelixDataAccessor accessor) { + // TODO: Need an optimize for reading context only if the refresh is needed. + long start = System.currentTimeMillis(); + _contextMap.clear(); + if (_clusterName == null) { + return; + } + String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(), + TaskConstants.REBALANCER_CONTEXT_ROOT); + List<String> contextPaths = new ArrayList<>(); + List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0); + if (childNames == null) { + return; + } + for (String context : childNames) { + contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE)); + } + + List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0); + for (int i = 0; i < contexts.size(); i++) { + ZNRecord context = contexts.get(i); + if (context != null && context.getSimpleField(NAME) != null) { + _contextMap.put(context.getSimpleField(NAME), context); + } else { + _contextMap.put(childNames.get(i), context); + LOG.debug( + String.format("Context for %s is null or miss the context NAME!", childNames.get((i)))); + } + } + + if (LOG.isDebugEnabled()) { + LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + ( + System.currentTimeMillis() - start) + " ms"); + } + } + + /** + * Returns job config map + * + * @return + */ + public Map<String, JobConfig> getJobConfigMap() { + return _jobConfigMap; + } + + /** + * Returns job config + * + * @param resource + * + * @return + */ + public JobConfig getJobConfig(String resource) { + return _jobConfigMap.get(resource); + } + + /** + * Returns workflow config map + * + * @return + */ + public Map<String, WorkflowConfig> getWorkflowConfigMap() { + return _workflowConfigMap; + } + + /** + * Returns workflow config + * + * @param resource + * + * @return + */ + public WorkflowConfig getWorkflowConfig(String resource) { + return _workflowConfigMap.get(resource); + } + + /** + * Return the JobContext by resource name + * + * @param resourceName + * + * @return + */ + public JobContext getJobContext(String resourceName) { + if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) { + return new JobContext(_contextMap.get(resourceName)); + } + return null; + } + + /** + * Return the WorkflowContext by resource name + * + * @param resourceName + * + * @return + */ + public WorkflowContext getWorkflowContext(String resourceName) { + if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) { + return new WorkflowContext(_contextMap.get(resourceName)); + } + return null; + } + + /** + * Update context of the Job + */ + public void updateJobContext(String resourceName, JobContext jobContext, + HelixDataAccessor accessor) { + updateContext(resourceName, jobContext.getRecord(), accessor); + } + + /** + * Update context of the Workflow + */ + public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext, + HelixDataAccessor accessor) { + updateContext(resourceName, workflowContext.getRecord(), accessor); + } + + /** + * Update context of the Workflow or Job + */ + private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) { + String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(), + TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE); + accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT); + _contextMap.put(resourceName, record); + } + + /** + * Return map of WorkflowContexts or JobContexts + * + * @return + */ + public Map<String, ZNRecord> getContexts() { + return _contextMap; + } + + @Override public String toString() { + return "TaskDataCache{" + + "_jobConfigMap=" + _jobConfigMap + + ", _workflowConfigMap=" + _workflowConfigMap + + ", _contextMap=" + _contextMap + + ", _clusterName='" + _clusterName + '\'' + + '}'; + } +} + http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java index 47413fc..4fa0c8c 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/controller/stages/ClusterDataCache.java @@ -19,24 +19,23 @@ package org.apache.helix.controller.stages; * under the License. */ +import com.google.common.collect.Maps; import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; import java.util.HashSet; -import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutorService; - -import org.apache.helix.AccessOption; import org.apache.helix.HelixDataAccessor; -import org.apache.helix.HelixProperty; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; -import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; +import org.apache.helix.common.caches.CurrentStateCache; +import org.apache.helix.common.caches.InstanceMessagesCache; +import org.apache.helix.common.caches.TaskDataCache; import org.apache.helix.model.ClusterConfig; import org.apache.helix.model.ClusterConstraints; import org.apache.helix.model.ClusterConstraints.ConstraintType; @@ -60,11 +59,6 @@ import org.apache.helix.task.WorkflowContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import com.google.common.base.Joiner; -import com.google.common.collect.Lists; -import com.google.common.collect.Maps; -import com.google.common.collect.Sets; - import static org.apache.helix.HelixConstants.ChangeType; /** @@ -87,26 +81,21 @@ public class ClusterDataCache { private Map<String, ResourceConfig> _resourceConfigMap; private Map<String, ResourceConfig> _resourceConfigCacheMap; private Map<String, ClusterConstraints> _constraintMap; - private Map<String, Map<String, Map<String, CurrentState>>> _currentStateMap; - private Map<String, Map<String, Message>> _messageMap; private Map<String, Map<String, String>> _idealStateRuleMap; private Map<String, Map<String, Long>> _missingTopStateMap = new HashMap<>(); - private Map<String, JobConfig> _jobConfigMap = new HashMap<>(); - private Map<String, WorkflowConfig> _workflowConfigMap = new HashMap<>(); - private Map<String, ZNRecord> _contextMap = new HashMap<>(); - private Map<String, ExternalView> _targetExternalViewMap = Maps.newHashMap(); + private Map<String, ExternalView> _targetExternalViewMap = new HashMap<>(); - // maintain a cache of participant messages across pipeline runs - private Map<String, Map<String, Message>> _messageCache = Maps.newHashMap(); - private Map<PropertyKey, CurrentState> _currentStateCache = Maps.newHashMap(); + private CurrentStateCache _currentStateCache; + private TaskDataCache _taskDataCache; + private InstanceMessagesCache _instanceMessagesCache; // maintain a cache of bestPossible assignment across pipeline runs // TODO: this is only for customRebalancer, remove it and merge it with _idealMappingCache. - private Map<String, ResourceAssignment> _resourceAssignmentCache = Maps.newHashMap(); + private Map<String, ResourceAssignment> _resourceAssignmentCache = new HashMap<>(); // maintain a cache of idealmapping (preference list) for full-auto resource across pipeline runs - private Map<String, ZNRecord> _idealMappingCache = Maps.newHashMap(); + private Map<String, ZNRecord> _idealMappingCache = new HashMap<>(); private Map<ChangeType, Boolean> _propertyDataChangedMap; @@ -122,15 +111,18 @@ public class ClusterDataCache { private String _clusterName; public ClusterDataCache () { + this(null); + } + + public ClusterDataCache(String clusterName) { _propertyDataChangedMap = new ConcurrentHashMap<>(); - for(ChangeType type : ChangeType.values()) { + for (ChangeType type : ChangeType.values()) { _propertyDataChangedMap.put(type, Boolean.valueOf(true)); } - } - - public ClusterDataCache (String clusterName) { - this(); _clusterName = clusterName; + _currentStateCache = new CurrentStateCache(_clusterName); + _taskDataCache = new TaskDataCache(_clusterName); + _instanceMessagesCache = new InstanceMessagesCache(_clusterName); } /** @@ -177,18 +169,17 @@ public class ClusterDataCache { LOG.debug("Reload ResourceConfigs: " + _resourceConfigCacheMap.size()); } - _idealStateMap = Maps.newHashMap(_idealStateCacheMap); - _liveInstanceMap = Maps.newHashMap(_liveInstanceCacheMap); + _idealStateMap = new HashMap<>(_idealStateCacheMap); + _liveInstanceMap = new HashMap(_liveInstanceCacheMap); _instanceConfigMap = new ConcurrentHashMap<>(_instanceConfigCacheMap); - _resourceConfigMap = Maps.newHashMap(_resourceConfigCacheMap); + _resourceConfigMap = new HashMap(_resourceConfigCacheMap); if (_updateInstanceOfflineTime) { updateOfflineInstanceHistory(accessor); } if (_isTaskCache) { - refreshJobContexts(accessor); - updateWorkflowJobConfigs(); + _taskDataCache.refresh(accessor, _resourceConfigMap); } Map<String, StateModelDefinition> stateDefMap = @@ -197,17 +188,20 @@ public class ClusterDataCache { _constraintMap = accessor.getChildValuesMap(keyBuilder.constraints()); _clusterConfig = accessor.getProperty(keyBuilder.clusterConfig()); - refreshMessages(accessor); - refreshCurrentStates(accessor); + _instanceMessagesCache + .refresh(accessor, _liveInstanceMap); + _currentStateCache.refresh(accessor, _liveInstanceMap); + // current state must be refreshed before refreshing relay messages // because we need to use current state to validate all relay messages. - updateRelayMessages(_messageMap); + _instanceMessagesCache + .updateRelayMessages(_liveInstanceMap, _currentStateCache.getCurrentStatesMap()); if (_clusterConfig != null) { _idealStateRuleMap = _clusterConfig.getIdealStateRules(); } else { - _idealStateRuleMap = Maps.newHashMap(); + _idealStateRuleMap = new HashMap(); LOG.warn("Cluster config is null!"); } @@ -229,7 +223,7 @@ public class ClusterDataCache { LOG.debug("ResourceConfigs: " + _resourceConfigMap.keySet()); LOG.debug("InstanceConfigs: " + _instanceConfigMap.keySet()); LOG.debug("ClusterConfigs: " + _clusterConfig); - LOG.debug("JobContexts: " + _contextMap.keySet()); + LOG.debug("JobContexts: " + _taskDataCache.getContexts().keySet()); } if (LOG.isTraceEnabled()) { @@ -239,215 +233,6 @@ public class ClusterDataCache { return true; } - private void refreshMessages(HelixDataAccessor accessor) { - long start = System.currentTimeMillis(); - Builder keyBuilder = accessor.keyBuilder(); - - Map<String, Map<String, Message>> msgMap = new HashMap<>(); - List<PropertyKey> newMessageKeys = Lists.newLinkedList(); - long purgeSum = 0; - for (String instanceName : _liveInstanceMap.keySet()) { - // get the cache - Map<String, Message> cachedMap = _messageCache.get(instanceName); - if (cachedMap == null) { - cachedMap = Maps.newHashMap(); - _messageCache.put(instanceName, cachedMap); - } - msgMap.put(instanceName, cachedMap); - - // get the current names - Set<String> messageNames = - Sets.newHashSet(accessor.getChildNames(keyBuilder.messages(instanceName))); - - long purgeStart = System.currentTimeMillis(); - // clear stale names - Iterator<String> cachedNamesIter = cachedMap.keySet().iterator(); - while (cachedNamesIter.hasNext()) { - String messageName = cachedNamesIter.next(); - if (!messageNames.contains(messageName)) { - cachedNamesIter.remove(); - } - } - long purgeEnd = System.currentTimeMillis(); - purgeSum += purgeEnd - purgeStart; - - // get the keys for the new messages - for (String messageName : messageNames) { - if (!cachedMap.containsKey(messageName)) { - newMessageKeys.add(keyBuilder.message(instanceName, messageName)); - } - } - } - - // get the new messages - if (newMessageKeys.size() > 0) { - List<Message> newMessages = accessor.getProperty(newMessageKeys); - for (Message message : newMessages) { - if (message != null) { - Map<String, Message> cachedMap = _messageCache.get(message.getTgtName()); - cachedMap.put(message.getId(), message); - } - } - } - - _messageMap = Collections.unmodifiableMap(msgMap); - - if (LOG.isDebugEnabled()) { - LOG.debug("Message purge took: " + purgeSum); - LOG.debug("# of Messages read from ZooKeeper " + newMessageKeys.size() + ". took " + ( - System.currentTimeMillis() - start) + " ms."); - } - } - - // update all valid relay messages attached to existing state transition messages into message map. - private void updateRelayMessages(Map<String, Map<String, Message>> messageMap) { - List<Message> relayMessages = new ArrayList<>(); - for (String instance : messageMap.keySet()) { - Map<String, Message> instanceMessages = messageMap.get(instance); - Map<String, Map<String, CurrentState>> instanceCurrentStateMap = _currentStateMap.get(instance); - if (instanceCurrentStateMap == null) { - continue; - } - - for (Message message : instanceMessages.values()) { - if (message.hasRelayMessages()) { - String sessionId = message.getTgtSessionId(); - String resourceName = message.getResourceName(); - String partitionName = message.getPartitionName(); - String targetState = message.getToState(); - String instanceSessionId = _liveInstanceMap.get(instance).getSessionId(); - - if (!instanceSessionId.equals(sessionId)) { - continue; - } - - Map<String, CurrentState> sessionCurrentStateMap = instanceCurrentStateMap.get(sessionId); - if (sessionCurrentStateMap == null) { - continue; - } - CurrentState currentState = sessionCurrentStateMap.get(resourceName); - if (currentState == null || !targetState.equals(currentState.getState(partitionName))) { - continue; - } - long transitionCompleteTime = currentState.getEndTime(partitionName); - - for (Message msg : message.getRelayMessages().values()) { - msg.setRelayTime(transitionCompleteTime); - if (!message.isExpired()) { - relayMessages.add(msg); - } - } - } - } - } - - for (Message message : relayMessages) { - String instance = message.getTgtName(); - Map<String, Message> instanceMessages = messageMap.get(instance); - if (instanceMessages == null) { - instanceMessages = new HashMap<>(); - messageMap.put(instance, instanceMessages); - } - instanceMessages.put(message.getId(), message); - } - } - - private void refreshCurrentStates(HelixDataAccessor accessor) { - refreshCurrentStatesCache(accessor); - - Map<String, Map<String, Map<String, CurrentState>>> allCurStateMap = new HashMap<>(); - for (PropertyKey key : _currentStateCache.keySet()) { - CurrentState currentState = _currentStateCache.get(key); - String[] params = key.getParams(); - if (currentState != null && params.length >= 4) { - String instanceName = params[1]; - String sessionId = params[2]; - String stateName = params[3]; - Map<String, Map<String, CurrentState>> instanceCurStateMap = - allCurStateMap.get(instanceName); - if (instanceCurStateMap == null) { - instanceCurStateMap = Maps.newHashMap(); - allCurStateMap.put(instanceName, instanceCurStateMap); - } - Map<String, CurrentState> sessionCurStateMap = instanceCurStateMap.get(sessionId); - if (sessionCurStateMap == null) { - sessionCurStateMap = Maps.newHashMap(); - instanceCurStateMap.put(sessionId, sessionCurStateMap); - } - sessionCurStateMap.put(stateName, currentState); - } - } - - for (String instance : allCurStateMap.keySet()) { - allCurStateMap.put(instance, Collections.unmodifiableMap(allCurStateMap.get(instance))); - } - _currentStateMap = Collections.unmodifiableMap(allCurStateMap); - } - - // reload current states that has been changed from zk to local cache. - private void refreshCurrentStatesCache(HelixDataAccessor accessor) { - long start = System.currentTimeMillis(); - Builder keyBuilder = accessor.keyBuilder(); - - List<PropertyKey> currentStateKeys = Lists.newLinkedList(); - for (String instanceName : _liveInstanceMap.keySet()) { - LiveInstance liveInstance = _liveInstanceMap.get(instanceName); - String sessionId = liveInstance.getSessionId(); - List<String> currentStateNames = - accessor.getChildNames(keyBuilder.currentStates(instanceName, sessionId)); - for (String currentStateName : currentStateNames) { - currentStateKeys.add(keyBuilder.currentState(instanceName, sessionId, currentStateName)); - } - } - - // All new entries from zk not cached locally yet should be read from ZK. - List<PropertyKey> reloadKeys = Lists.newLinkedList(currentStateKeys); - reloadKeys.removeAll(_currentStateCache.keySet()); - - List<PropertyKey> cachedKeys = Lists.newLinkedList(_currentStateCache.keySet()); - cachedKeys.retainAll(currentStateKeys); - - List<HelixProperty.Stat> stats = accessor.getPropertyStats(cachedKeys); - Map<PropertyKey, CurrentState> currentStatesMap = Maps.newHashMap(); - for (int i=0; i < cachedKeys.size(); i++) { - PropertyKey key = cachedKeys.get(i); - HelixProperty.Stat stat = stats.get(i); - if (stat != null) { - CurrentState property = _currentStateCache.get(key); - if (property != null && property.getBucketSize() == 0 && property.getStat().equals(stat)) { - currentStatesMap.put(key, property); - } else { - // need update from zk - reloadKeys.add(key); - } - } else { - LOG.debug("stat is null for key: " + key); - reloadKeys.add(key); - } - } - - List<CurrentState> currentStates = accessor.getProperty(reloadKeys); - Iterator<PropertyKey> csKeyIter = reloadKeys.iterator(); - for (CurrentState currentState : currentStates) { - PropertyKey key = csKeyIter.next(); - if (currentState != null) { - currentStatesMap.put(key, currentState); - } else { - LOG.debug("CurrentState null for key: " + key); - } - } - - _currentStateCache = Collections.unmodifiableMap(currentStatesMap); - - if (LOG.isDebugEnabled()) { - LOG.debug("# of CurrentState paths read from ZooKeeper " + reloadKeys.size()); - LOG.debug( - "# of CurrentState paths skipped reading from ZK: " + (currentStateKeys.size() - reloadKeys.size())); - } - LOG.info( - "Takes " + (System.currentTimeMillis() - start) + " ms to reload new current states!"); - } - private void updateOfflineInstanceHistory(HelixDataAccessor accessor) { List<String> offlineNodes = new ArrayList<>(_instanceConfigMap.keySet()); offlineNodes.removeAll(_liveInstanceMap.keySet()); @@ -502,7 +287,7 @@ public class ClusterDataCache { } public synchronized void setIdealStates(List<IdealState> idealStates) { - Map<String, IdealState> idealStateMap = Maps.newHashMap(); + Map<String, IdealState> idealStateMap = new HashMap(); for (IdealState idealState : idealStates) { idealStateMap.put(idealState.getId(), idealState); } @@ -588,7 +373,7 @@ public class ClusterDataCache { public synchronized void setLiveInstances(List<LiveInstance> liveInstances) { - Map<String, LiveInstance> liveInstanceMap = Maps.newHashMap(); + Map<String, LiveInstance> liveInstanceMap = new HashMap(); for (LiveInstance liveInstance : liveInstances) { liveInstanceMap.put(liveInstance.getId(), liveInstance); } @@ -597,18 +382,16 @@ public class ClusterDataCache { } /** - * Provides the current state of the node for a given session id, - * the sessionid can be got from LiveInstance + * Provides the current state of the node for a given session id, the sessionid can be got from + * LiveInstance + * * @param instanceName * @param clientSessionId + * * @return */ public Map<String, CurrentState> getCurrentState(String instanceName, String clientSessionId) { - if (!_currentStateMap.containsKey(instanceName) - || !_currentStateMap.get(instanceName).containsKey(clientSessionId)) { - return Collections.emptyMap(); - } - return _currentStateMap.get(instanceName).get(clientSessionId); + return _currentStateCache.getCurrentState(instanceName, clientSessionId); } /** @@ -617,26 +400,11 @@ public class ClusterDataCache { * @return */ public Map<String, Message> getMessages(String instanceName) { - Map<String, Message> map = _messageMap.get(instanceName); - if (map != null) { - return map; - } else { - return Collections.emptyMap(); - } + return _instanceMessagesCache.getMessages(instanceName); } public void cacheMessages(List<Message> messages) { - for (Message message : messages) { - String instanceName = message.getTgtName(); - Map<String, Message> instMsgMap; - if (_messageCache.containsKey(instanceName)) { - instMsgMap = _messageCache.get(instanceName); - } else { - instMsgMap = Maps.newHashMap(); - _messageCache.put(instanceName, instMsgMap); - } - instMsgMap.put(message.getId(), message); - } + _instanceMessagesCache.cacheMessages(messages); } /** @@ -721,7 +489,7 @@ public class ClusterDataCache { * @return */ public Map<String, JobConfig> getJobConfigMap() { - return _jobConfigMap; + return _taskDataCache.getJobConfigMap(); } /** @@ -730,7 +498,7 @@ public class ClusterDataCache { * @return */ public JobConfig getJobConfig(String resource) { - return _jobConfigMap.get(resource); + return _taskDataCache.getJobConfig(resource); } /** @@ -738,7 +506,7 @@ public class ClusterDataCache { * @return */ public Map<String, WorkflowConfig> getWorkflowConfigMap() { - return _workflowConfigMap; + return _taskDataCache.getWorkflowConfigMap(); } /** @@ -747,12 +515,12 @@ public class ClusterDataCache { * @return */ public WorkflowConfig getWorkflowConfig(String resource) { - return _workflowConfigMap.get(resource); + return _taskDataCache.getWorkflowConfig(resource); } public synchronized void setInstanceConfigs(List<InstanceConfig> instanceConfigs) { - Map<String, InstanceConfig> instanceConfigMap = Maps.newHashMap(); + Map<String, InstanceConfig> instanceConfigMap = new HashMap(); for (InstanceConfig instanceConfig : instanceConfigs) { instanceConfigMap.put(instanceConfig.getId(), instanceConfig); } @@ -881,10 +649,7 @@ public class ClusterDataCache { * @return */ public JobContext getJobContext(String resourceName) { - if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) { - return new JobContext(_contextMap.get(resourceName)); - } - return null; + return _taskDataCache.getJobContext(resourceName); } /** @@ -893,10 +658,7 @@ public class ClusterDataCache { * @return */ public WorkflowContext getWorkflowContext(String resourceName) { - if (_contextMap.containsKey(resourceName) && _contextMap.get(resourceName) != null) { - return new WorkflowContext(_contextMap.get(resourceName)); - } - return null; + return _taskDataCache.getWorkflowContext(resourceName); } /** @@ -904,7 +666,7 @@ public class ClusterDataCache { */ public void updateJobContext(String resourceName, JobContext jobContext, HelixDataAccessor accessor) { - updateContext(resourceName, jobContext.getRecord(), accessor); + _taskDataCache.updateJobContext(resourceName, jobContext, accessor); } /** @@ -912,17 +674,7 @@ public class ClusterDataCache { */ public void updateWorkflowContext(String resourceName, WorkflowContext workflowContext, HelixDataAccessor accessor) { - updateContext(resourceName, workflowContext.getRecord(), accessor); - } - - /** - * Update context of the Workflow or Job - */ - private void updateContext(String resourceName, ZNRecord record, HelixDataAccessor accessor) { - String path = String.format("/%s/%s%s/%s/%s", _clusterName, PropertyType.PROPERTYSTORE.name(), - TaskConstants.REBALANCER_CONTEXT_ROOT, resourceName, TaskConstants.CONTEXT_NODE); - accessor.getBaseDataAccessor().set(path, record, AccessOption.PERSISTENT); - _contextMap.put(resourceName, record); + _taskDataCache.updateWorkflowContext(resourceName, workflowContext, accessor); } /** @@ -930,7 +682,7 @@ public class ClusterDataCache { * @return */ public Map<String, ZNRecord> getContexts() { - return _contextMap; + return _taskDataCache.getContexts(); } public ExternalView getTargetExternalView(String resourceName) { @@ -1066,61 +818,11 @@ public class ClusterDataCache { sb.append("stateModelDefMap:" + _stateModelDefMap).append("\n"); sb.append("instanceConfigMap:" + _instanceConfigMap).append("\n"); sb.append("resourceConfigMap:" + _resourceConfigMap).append("\n"); - sb.append("jobContextMap:" + _contextMap).append("\n"); - sb.append("messageMap:" + _messageMap).append("\n"); - sb.append("currentStateMap:" + _currentStateMap).append("\n"); + sb.append("taskDataCache:" + _taskDataCache).append("\n"); + sb.append("messageCache:" + _instanceMessagesCache).append("\n"); + sb.append("currentStateCache:" + _currentStateCache).append("\n"); sb.append("clusterConfig:" + _clusterConfig).append("\n"); return sb.toString(); } - - private void refreshJobContexts(HelixDataAccessor accessor) { - // TODO: Need an optimize for reading context only if the refresh is needed. - long start = System.currentTimeMillis(); - _contextMap.clear(); - if (_clusterName == null) { - return; - } - String path = String.format("/%s/%s%s", _clusterName, PropertyType.PROPERTYSTORE.name(), - TaskConstants.REBALANCER_CONTEXT_ROOT); - List<String> contextPaths = new ArrayList<>(); - List<String> childNames = accessor.getBaseDataAccessor().getChildNames(path, 0); - if (childNames == null) { - return; - } - for (String context : childNames) { - contextPaths.add(Joiner.on("/").join(path, context, TaskConstants.CONTEXT_NODE)); - } - - List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0); - for (int i = 0; i < contexts.size(); i++) { - ZNRecord context = contexts.get(i); - if (context != null && context.getSimpleField(NAME) != null) { - _contextMap.put(context.getSimpleField(NAME), context); - } else { - _contextMap.put(childNames.get(i), context); - LOG.debug( - String.format("Context for %s is null or miss the context NAME!", childNames.get((i)))); - } - } - - if (LOG.isDebugEnabled()) { - LOG.debug("# of workflow/job context read from zk: " + _contextMap.size() + ". Take " + ( - System.currentTimeMillis() - start) + " ms"); - } - } - - private void updateWorkflowJobConfigs() { - _workflowConfigMap.clear(); - _jobConfigMap.clear(); - for (Map.Entry<String, ResourceConfig> entry : _resourceConfigMap.entrySet()) { - if (entry.getValue().getRecord().getSimpleFields() - .containsKey(WorkflowConfig.WorkflowConfigProperty.Dag.name())) { - _workflowConfigMap.put(entry.getKey(), new WorkflowConfig(entry.getValue())); - } else if (entry.getValue().getRecord().getSimpleFields() - .containsKey(WorkflowConfig.WorkflowConfigProperty.WorkflowID.name())) { - _jobConfigMap.put(entry.getKey(), new JobConfig(entry.getValue())); - } - } - } } http://git-wip-us.apache.org/repos/asf/helix/blob/ae02b58d/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java index 5602333..332cd8a 100644 --- a/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/spectator/RoutingDataCache.java @@ -21,7 +21,7 @@ package org.apache.helix.spectator; import org.apache.helix.HelixConstants; import org.apache.helix.PropertyType; -import org.apache.helix.common.BasicClusterDataCache; +import org.apache.helix.common.caches.BasicClusterDataCache; /** * Cache the cluster data that are needed by RoutingTableProvider.
