This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch helix-0.9.x-hotfix in repository https://gitbox.apache.org/repos/asf/helix.git
commit 1684a6e2d2bf48c1e05e623edb3fce5ea35ae86e Author: Jiajun Wang <[email protected]> AuthorDate: Thu May 14 17:34:22 2020 -0700 Enforce result check for data accessors batch get calls to prevent partial batch read. (#974) This will help to ensure the main Helix logic does not calculate based on incomplete input. --- .../java/org/apache/helix/BaseDataAccessor.java | 2 ++ .../java/org/apache/helix/HelixDataAccessor.java | 4 ++- .../apache/helix/common/caches/TaskDataCache.java | 4 +-- .../helix/controller/GenericHelixController.java | 9 +++--- .../apache/helix/manager/zk/CallbackHandler.java | 2 +- .../helix/manager/zk/ParticipantManager.java | 8 +++-- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 16 +++++----- .../helix/manager/zk/ZKHelixDataAccessor.java | 2 +- .../helix/manager/zk/ZkBaseDataAccessor.java | 2 ++ .../helix/manager/zk/ZkCacheBaseDataAccessor.java | 21 ++++++++----- .../apache/helix/messaging/CriteriaEvaluator.java | 11 +++++-- .../helix/messaging/DefaultMessagingService.java | 5 +++- .../messaging/handling/HelixTaskExecutor.java | 13 ++++++-- .../helix/monitoring/ZKPathDataDumpTask.java | 3 +- .../helix/store/zk/AutoFallbackPropertyStore.java | 20 ++++++------- .../java/org/apache/helix/task/TaskDriver.java | 2 +- .../org/apache/helix/task/WorkflowDispatcher.java | 2 +- .../helix/tools/ClusterExternalViewVerifier.java | 2 +- .../apache/helix/tools/ClusterStateVerifier.java | 9 ++++-- .../BestPossibleExternalViewVerifier.java | 6 ++-- .../StrictMatchExternalViewVerifier.java | 3 +- .../apache/helix/tools/commandtools/ZkCopy.java | 2 +- .../org/apache/helix/util/StatusUpdateUtil.java | 2 +- .../test/java/org/apache/helix/MockAccessor.java | 35 ++++++++++++---------- .../java/org/apache/helix/TestRoutingTable.java | 10 +++---- .../controller/stages/TestCustomizedViewStage.java | 0 .../controller/stages/TestExternalViewStage.java | 6 ++-- .../controller/stages/TestRebalancePipeline.java | 10 +++---- .../TestAddStateModelFactoryAfterConnect.java | 2 +- .../TestMessagePartitionStateMismatch.java | 2 +- .../TestNoThrottleDisabledPartitions.java | 9 ++++-- .../apache/helix/integration/TestStatusUpdate.java | 2 +- .../controller/TestTargetExternalView.java | 9 +++--- .../integration/messaging/TestMessageThrottle.java | 2 +- .../helix/integration/task/TaskTestUtil.java | 2 +- .../manager/zk/TestWtCacheAsyncOpSingleThread.java | 4 +-- .../helix/manager/zk/TestZkBaseDataAccessor.java | 2 +- .../manager/zk/TestZkCacheAsyncOpSingleThread.java | 4 +-- .../messaging/TestDefaultMessagingService.java | 3 +- .../messaging/handling/MockHelixTaskExecutor.java | 2 +- .../messaging/handling/TestHelixTaskExecutor.java | 12 +++++--- .../messaging/p2pMessage/TestP2PMessages.java | 6 ++-- .../apache/helix/mock/MockBaseDataAccessor.java | 14 +++++---- .../apache/helix/mock/MockZkHelixDataAccessor.java | 12 +++----- .../store/zk/TestAutoFallbackPropertyStore.java | 6 ++-- .../rest/server/service/ClusterServiceImpl.java | 2 +- .../rest/server/service/InstanceServiceImpl.java | 12 ++++---- .../rest/server/service/TestClusterService.java | 8 +++-- 48 files changed, 190 insertions(+), 136 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java index 7d024fa..1c44053 100644 --- a/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/BaseDataAccessor.java @@ -138,6 +138,7 @@ public interface BaseDataAccessor<T> { * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return List of record data stored at each ZNode */ + @Deprecated List<T> get(List<String> paths, List<Stat> stats, int options); /** @@ -165,6 +166,7 @@ public interface BaseDataAccessor<T> { * @param options Set the type of ZNode see the valid values in {@link AccessOption} * @return A list of children of the parent ZNode */ + @Deprecated List<T> getChildren(String parentPath, List<Stat> stats, int options); /** diff --git a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java index d8dde1e..025cb2a 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java @@ -83,6 +83,7 @@ public interface HelixDataAccessor { * @param keys * @return */ + @Deprecated <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys); /** @@ -134,6 +135,7 @@ public interface HelixDataAccessor { * @param key * @return subPropertyValues */ + @Deprecated <T extends HelixProperty> List<T> getChildValues(PropertyKey key); /** @@ -154,7 +156,7 @@ public interface HelixDataAccessor { * @param key * @return a map of property identifiers to typed properties */ - + @Deprecated <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key); /** diff --git a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java index 5c29124..c962607 100644 --- a/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java +++ b/helix-core/src/main/java/org/apache/helix/common/caches/TaskDataCache.java @@ -193,9 +193,9 @@ public class TaskDataCache extends AbstractDataCache { } } - List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0); + List<ZNRecord> contexts = accessor.getBaseDataAccessor().get(contextPaths, null, 0, true); List<ZNRecord> prevAssignments = - accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0); + accessor.getBaseDataAccessor().get(prevAssignmentPaths, null, 0, true); for (int i = 0; i < contexts.size(); i++) { ZNRecord context = contexts.get(i); diff --git a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java index b80b8cc..5b0410a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java +++ b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java @@ -246,15 +246,14 @@ public class GenericHelixController implements IdealStateChangeListener, HelixDataAccessor accessor = _manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); - Map<String, LiveInstance> liveInstanceMap = - accessor.getChildValuesMap(keyBuilder.liveInstances()); + List<LiveInstance> liveInstances = + accessor.getChildValues(keyBuilder.liveInstances(), true); - if (liveInstanceMap != null && !liveInstanceMap.isEmpty()) { + if (liveInstances != null && !liveInstances.isEmpty()) { NotificationContext changeContext = new NotificationContext(_manager); changeContext.setType(NotificationContext.Type.CALLBACK); synchronized (_manager) { - checkLiveInstancesObservation(new ArrayList<>(liveInstanceMap.values()), - changeContext); + checkLiveInstancesObservation(liveInstances, changeContext); } } } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java index 42afcd1..22b13cb 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/CallbackHandler.java @@ -442,7 +442,7 @@ public class CallbackHandler implements IZkChildListener, IZkDataListener { private <T extends HelixProperty> List<T> preFetch(PropertyKey key) { if (_preFetchEnabled) { - return _accessor.getChildValues(key); + return _accessor.getChildValues(key, true); } else { return Collections.emptyList(); } diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java index 76cb791..2fc0cf4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ParticipantManager.java @@ -275,16 +275,18 @@ public class ParticipantManager { continue; } + // Ignore if any current states in the previous folder cannot be read. List<CurrentState> lastCurStates = - _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session)); + _dataAccessor.getChildValues(_keyBuilder.currentStates(_instanceName, session), false); for (CurrentState lastCurState : lastCurStates) { LOG.info("Carrying over old session: " + session + ", resource: " + lastCurState.getId() + " to current session: " + _sessionId); String stateModelDefRef = lastCurState.getStateModelDefRef(); if (stateModelDefRef == null) { - LOG.error("skip carry-over because previous current state doesn't have a state model definition. previous current-state: " - + lastCurState); + LOG.error( + "skip carry-over because previous current state doesn't have a state model definition. previous current-state: " + + lastCurState); continue; } StateModelDefinition stateModel = diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 12ab636..af0035a 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -562,7 +562,7 @@ public class ZKHelixAdmin implements HelixAdmin { } // check there is no pending messages for the partitions exist - List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName)); + List<Message> messages = accessor.getChildValues(keyBuilder.messages(instanceName), true); for (Message message : messages) { if (!MessageType.STATE_TRANSITION.name().equalsIgnoreCase(message.getMsgType()) || !sessionId .equals(message.getTgtSessionId()) || !resourceName.equals(message.getResourceName()) @@ -621,9 +621,9 @@ public class ZKHelixAdmin implements HelixAdmin { logger.info("Reset instances {} in cluster {}.", instanceNames == null ? "NULL" : HelixUtil.serializeByComma(instanceNames), clusterName); HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); - List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews(), true); Set<String> resetInstanceNames = new HashSet<String>(instanceNames); for (String instanceName : resetInstanceNames) { @@ -649,9 +649,9 @@ public class ZKHelixAdmin implements HelixAdmin { logger.info("Reset resources {} in cluster {}.", resourceNames == null ? "NULL" : HelixUtil.serializeByComma(resourceNames), clusterName); HelixDataAccessor accessor = - new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<ZNRecord>(_zkClient)); - Builder keyBuilder = accessor.keyBuilder(); - List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); + new ZKHelixDataAccessor(clusterName, new ZkBaseDataAccessor<>(_zkClient)); + PropertyKey.Builder keyBuilder = accessor.keyBuilder(); + List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews(), true); Set<String> resetResourceNames = new HashSet<String>(resourceNames); for (ExternalView extView : extViews) { @@ -1559,7 +1559,7 @@ public class ZKHelixAdmin implements HelixAdmin { List<String> instances = new ArrayList<>(); String path = PropertyPathBuilder.instanceConfig(clusterName); BaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<>(_zkClient); - List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0); + List<ZNRecord> znRecords = baseAccessor.getChildren(path, null, 0, 0, 0); for (ZNRecord record : znRecords) { if (record != null) { InstanceConfig instanceConfig = new InstanceConfig(record); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index 51f16ac..6640229 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -313,7 +313,7 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { property.getRecord().getMapFields().clear(); property.getRecord().getListFields().clear(); - List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options); + List<ZNRecord> childRecords = _baseDataAccessor.getChildren(path, null, options, 0, 0); ZNRecord assembledRecord = new ZNRecordAssembler().assemble(childRecords); // merge with parent node value diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index a4dfe21..3e57b5d 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -329,6 +329,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { /** * async get */ + @Deprecated @Override public List<T> get(List<String> paths, List<Stat> stats, int options) { boolean[] needRead = new boolean[paths.size()]; @@ -426,6 +427,7 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { * The retryCount and retryInterval will be ignored. */ // TODO: Change the behavior of getChildren when Helix starts migrating API. + @Deprecated @Override public List<T> getChildren(String parentPath, List<Stat> stats, int options) { return getChildren(parentPath, stats, options, false); diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 67bf46e..a579707 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -564,6 +564,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { return _baseAccessor.remove(serverPaths, options); } + @Deprecated @Override public List<T> get(List<String> paths, List<Stat> stats, int options) { return get(paths, stats, options, false); @@ -674,8 +675,20 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { return _baseAccessor.getChildNames(serverParentPath, options); } + @Deprecated @Override public List<T> getChildren(String parentPath, List<Stat> stats, int options) { + return getChildren(parentPath, stats, options, false); + } + + @Override + public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount, + int retryInterval) throws HelixException { + // TODO add retry logic according to retryCount and retryInterval input + return getChildren(parentPath, stats, options, true); + } + + private List<T> getChildren(String parentPath, List<Stat> stats, int options, boolean throwException) { List<String> childNames = getChildNames(parentPath, options); if (childNames == null) { return null; @@ -687,13 +700,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { paths.add(path); } - return get(paths, stats, options); - } - - @Override - public List<T> getChildren(String parentPath, List<Stat> stats, int options, int retryCount, - int retryInterval) throws HelixException { - return getChildren(parentPath, stats, options); + return get(paths, stats, options, throwException); } @Override diff --git a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java index 32d6894..1499e53 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/CriteriaEvaluator.java @@ -94,7 +94,11 @@ public class CriteriaEvaluator { List<ZNRecordRow> allRows = ZNRecordRow.flatten(HelixProperty.convertToList(properties)); // save the matches - Set<String> liveParticipants = accessor.getChildValuesMap(keyBuilder.liveInstances()).keySet(); + // TODO: Apply strict check on the getChildValuesMap() call. + // TODO: For backward compatibility, allow partial read for now. This may reduce the + // TODO: match result eventually. + Set<String> liveParticipants = + accessor.getChildValuesMap(keyBuilder.liveInstances(), false).keySet(); List<ZNRecordRow> result = Lists.newArrayList(); for (ZNRecordRow row : allRows) { // The participant instance name is stored in the return value of either getRecordId() or @@ -181,7 +185,10 @@ public class CriteriaEvaluator { PropertyKey propertyKeys, PropertyKey propertyKey, String dataType) { List<HelixProperty> properties; if (Strings.isNullOrEmpty(dataSpec) || dataSpec.equals(MATCH_ALL_SYM)) { - properties = accessor.getChildValues(propertyKeys); + // TODO: Apply strict check on the getChildValues() call. + // TODO: For backward compatibility, allow partial read for now. This may reduce the + // TODO: matches eventually. + properties = accessor.getChildValues(propertyKeys, false); } else { HelixProperty data = accessor.getProperty(propertyKey); if (data == null) { diff --git a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java index 84aeafb..6785381 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/DefaultMessagingService.java @@ -186,7 +186,10 @@ public class DefaultMessagingService implements ClusterMessagingService { Map<String, String> sessionIdMap = new HashMap<String, String>(); if (recipientCriteria.isSessionSpecific()) { Builder keyBuilder = targetDataAccessor.keyBuilder(); - List<LiveInstance> liveInstances = targetDataAccessor.getChildValues(keyBuilder.liveInstances()); + // For backward compatibility, allow partial read for the live instances. + // Note that this may cause the pending message to be sent with null target session Id. + List<LiveInstance> liveInstances = + targetDataAccessor.getChildValues(keyBuilder.liveInstances(), false); for (LiveInstance liveInstance : liveInstances) { sessionIdMap.put(liveInstance.getInstanceName(), liveInstance.getEphemeralOwner()); diff --git a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java index a0481e3..9388876 100644 --- a/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java +++ b/helix-core/src/main/java/org/apache/helix/messaging/handling/HelixTaskExecutor.java @@ -700,7 +700,15 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { } } - List<Message> newMessages = accessor.getProperty(keys); + /** + * Do not throw exception on partial message read. + * 1. There is no way to resolve the error on the participant side. And once it fails here, we + * are running the risk of ignoring the message change event. And the participant might be stuck. + * 2. Even this is a partial read, we have another chance to retry in the business logic since + * as long as the participant processes messages, it will touch the message folder and triggers + * another message event. + */ + List<Message> newMessages = accessor.getProperty(keys, false); // Message may be removed before get read, clean up null messages. Iterator<Message> messageIterator = newMessages.iterator(); while(messageIterator.hasNext()) { @@ -817,7 +825,8 @@ public class HelixTaskExecutor implements MessageListener, TaskExecutor { LOG.info(String.format("Controller received PARTICIPANT_SESSION_CHANGE msg from src: %s", message.getMsgSrc())); PropertyKey key = new Builder(manager.getClusterName()).liveInstances(); - List<LiveInstance> liveInstances = manager.getHelixDataAccessor().getChildValues(key); + List<LiveInstance> liveInstances = + manager.getHelixDataAccessor().getChildValues(key, true); _controller.onLiveInstanceChange(liveInstances, changeContext); reportAndRemoveMessage(message, accessor, instanceName, ProcessedMessageState.COMPLETED); continue; diff --git a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java index 0b70e41..aef8acf 100644 --- a/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java +++ b/helix-core/src/main/java/org/apache/helix/monitoring/ZKPathDataDumpTask.java @@ -140,7 +140,8 @@ public class ZKPathDataDumpTask extends TimerTask { if (!dumpPaths.isEmpty()) { LOG.info("Dump statusUpdates and errors records for paths: " + dumpPaths); - List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0); + // No need to fail the batch read operation even it is partial result becuase it is for cleaning up. + List<ZNRecord> dumpRecords = accessor.get(dumpPaths, null, 0, false); for (ZNRecord record : dumpRecords) { if (record != null) { LOG.info(new String(_jsonSerializer.serialize(record))); diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java index ca1174e..4f960d3 100644 --- a/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java +++ b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java @@ -142,7 +142,7 @@ public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> { if (fallbackMap.size() > 0) { List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); - List<T> fallbackValues = _fallbackStore.get(fallbackPaths, null, options); + List<T> fallbackValues = _fallbackStore.get(fallbackPaths, null, options, true); boolean createSucceed[] = super.createChildren(fallbackPaths, fallbackValues, AccessOption.PERSISTENT); @@ -217,24 +217,23 @@ public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> { } @Override - public List<T> get(List<String> paths, List<Stat> stats, int options) { + public List<T> get(List<String> paths, List<Stat> stats, int options, boolean throwException) { if (_fallbackStore == null) { - return super.get(paths, stats, options); + return super.get(paths, stats, options, throwException); } else { - List<T> values = super.get(paths, stats, options); - - Map<String, Integer> fallbackMap = new HashMap<String, Integer>(); + List<T> values = super.get(paths, stats, options, throwException); + Map<String, Integer> fallbackMap = new HashMap<>(); for (int i = 0; i < paths.size(); i++) { T value = values.get(i); if (value == null) { fallbackMap.put(paths.get(i), i); } } - if (fallbackMap.size() > 0) { - List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); - List<Stat> fallbackStats = new ArrayList<Stat>(); - List<T> fallbackValues = _fallbackStore.get(fallbackPaths, fallbackStats, options); + List<String> fallbackPaths = new ArrayList<>(fallbackMap.keySet()); + List<Stat> fallbackStats = new ArrayList<>(); + List<T> fallbackValues = + _fallbackStore.get(fallbackPaths, fallbackStats, options, throwException); for (int i = 0; i < fallbackPaths.size(); i++) { String fallbackPath = fallbackPaths.get(i); int j = fallbackMap.get(fallbackPath); @@ -244,7 +243,6 @@ public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> { } } } - return values; } } diff --git a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java index dfca045..58de717 100644 --- a/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java +++ b/helix-core/src/main/java/org/apache/helix/task/TaskDriver.java @@ -851,7 +851,7 @@ public class TaskDriver { public Map<String, WorkflowConfig> getWorkflows() { Map<String, WorkflowConfig> workflowConfigMap = new HashMap<>(); Map<String, ResourceConfig> resourceConfigMap = - _accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs()); + _accessor.getChildValuesMap(_accessor.keyBuilder().resourceConfigs(), true); for (Map.Entry<String, ResourceConfig> resource : resourceConfigMap.entrySet()) { try { diff --git a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java index 88f2168..2a8a3aa 100644 --- a/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java +++ b/helix-core/src/main/java/org/apache/helix/task/WorkflowDispatcher.java @@ -484,7 +484,7 @@ public class WorkflowDispatcher extends AbstractTaskDispatcher { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); Map<String, HelixProperty> resourceConfigMap = - accessor.getChildValuesMap(keyBuilder.resourceConfigs()); + accessor.getChildValuesMap(keyBuilder.resourceConfigs(), true); if (!resourceConfigMap.containsKey(origWorkflowName)) { LOG.error("No such workflow named " + origWorkflowName); return null; diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java index 43e98f4..9a13648 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterExternalViewVerifier.java @@ -155,7 +155,7 @@ public class ClusterExternalViewVerifier extends ClusterVerifier { BestPossibleStateOutput bestPossbileStates = calculateBestPossibleState(cache); Map<String, ExternalView> externalViews = - _accessor.getChildValuesMap(_keyBuilder.externalViews()); + _accessor.getChildValuesMap(_keyBuilder.externalViews(), true); // TODO all ideal-states should be included in external-views diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java index 6d71c04..ebe44cb 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterStateVerifier.java @@ -211,7 +211,8 @@ public class ClusterStateVerifier { } } - Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews()); + Map<String, ExternalView> extViews = + accessor.getChildValuesMap(keyBuilder.externalViews(), true); if (extViews == null) { extViews = Collections.emptyMap(); } @@ -469,13 +470,15 @@ public class ClusterStateVerifier { private boolean verifyMasterNbInExtView(HelixDataAccessor accessor) { Builder keyBuilder = accessor.keyBuilder(); - Map<String, IdealState> idealStates = accessor.getChildValuesMap(keyBuilder.idealStates()); + Map<String, IdealState> idealStates = + accessor.getChildValuesMap(keyBuilder.idealStates(), true); if (idealStates == null || idealStates.size() == 0) { LOG.info("No resource idealState"); return true; } - Map<String, ExternalView> extViews = accessor.getChildValuesMap(keyBuilder.externalViews()); + Map<String, ExternalView> extViews = + accessor.getChildValuesMap(keyBuilder.externalViews(), true); if (extViews == null || extViews.size() < idealStates.size()) { LOG.info("No externalViews | externalView.size() < idealState.size()"); return false; diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java index 52ced19..850ecd1 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/BestPossibleExternalViewVerifier.java @@ -208,12 +208,14 @@ public class BestPossibleExternalViewVerifier extends ZkHelixClusterVerifier { if (_expectLiveInstances != null && !_expectLiveInstances.isEmpty()) { Set<String> actualLiveNodes = _dataProvider.getLiveInstances().keySet(); if (!_expectLiveInstances.equals(actualLiveNodes)) { - LOG.warn("Live instances are not as expected. Actual live nodes: " + actualLiveNodes.toString()); + LOG.warn("Live instances are not as expected. Actual live nodes: " + actualLiveNodes + .toString()); return false; } } - Map<String, ExternalView> extViews = _accessor.getChildValuesMap(keyBuilder.externalViews()); + Map<String, ExternalView> extViews = + _accessor.getChildValuesMap(keyBuilder.externalViews(), true); if (extViews == null) { extViews = Collections.emptyMap(); } diff --git a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java index 85f0397..f638c7d 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java +++ b/helix-core/src/main/java/org/apache/helix/tools/ClusterVerifiers/StrictMatchExternalViewVerifier.java @@ -193,7 +193,8 @@ public class StrictMatchExternalViewVerifier extends ZkHelixClusterVerifier { } } - Map<String, ExternalView> extViews = _accessor.getChildValuesMap(keyBuilder.externalViews()); + Map<String, ExternalView> extViews = + _accessor.getChildValuesMap(keyBuilder.externalViews(), true); if (extViews == null) { extViews = Collections.emptyMap(); } diff --git a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java index 805847c..9cf18cc 100644 --- a/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java +++ b/helix-core/src/main/java/org/apache/helix/tools/commandtools/ZkCopy.java @@ -108,7 +108,7 @@ public class ZkCopy { readPaths.add(concatenate(srcRootPath, path)); } List<Stat> stats = new ArrayList<Stat>(); - List<Object> readData = srcAccessor.get(readPaths, stats, 0); + List<Object> readData = srcAccessor.get(readPaths, stats, 0, true); List<String> writePaths = new ArrayList<String>(); List<Object> writeData = new ArrayList<Object>(); diff --git a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java index b7a422a..c6d85fe 100644 --- a/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java +++ b/helix-core/src/main/java/org/apache/helix/util/StatusUpdateUtil.java @@ -132,7 +132,7 @@ public class StatusUpdateUtil { Builder keyBuilder = accessor.keyBuilder(); List<ZNRecord> instances = - HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs())); + HelixProperty.convertToList(accessor.getChildValues(keyBuilder.instanceConfigs(), true)); List<ZNRecord> partitionRecords = new ArrayList<ZNRecord>(); for (ZNRecord znRecord : instances) { String instanceName = znRecord.getId(); diff --git a/helix-core/src/test/java/org/apache/helix/MockAccessor.java b/helix-core/src/test/java/org/apache/helix/MockAccessor.java index 75f6bbd..d78363c 100644 --- a/helix-core/src/test/java/org/apache/helix/MockAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/MockAccessor.java @@ -184,26 +184,29 @@ public class MockAccessor implements HelixDataAccessor { } @SuppressWarnings("unchecked") - @Override public <T extends HelixProperty> List<T> getChildValues(PropertyKey propertyKey) { - String path = propertyKey.getPath(); // PropertyPathConfig.getPath(type, - List<ZNRecord> children = _baseDataAccessor.getChildren(path, null, 0); - return (List<T>) HelixProperty.convertToTypedList(propertyKey.getTypeClass(), children); + @Deprecated + @Override + public <T extends HelixProperty> List<T> getChildValues(PropertyKey propertyKey) { + return getChildValues(propertyKey, false); } - @Override public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, - boolean throwException) { - return getChildValues(key); + @Override + public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, boolean throwException) { + String path = key.getPath(); // PropertyPathConfig.getPath(type, + List<ZNRecord> children = _baseDataAccessor.getChildren(path, null, 0, 0, 0); + return (List<T>) HelixProperty.convertToTypedList(key.getTypeClass(), children); } + @Deprecated @Override public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) { - List<T> list = getChildValues(key); - return HelixProperty.convertListToMap(list); + return getChildValuesMap(key, false); } @Override public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key, boolean throwException) { - return getChildValuesMap(key); + List<T> list = getChildValues(key, throwException); + return HelixProperty.convertListToMap(list); } @Override @@ -238,8 +241,15 @@ public class MockAccessor implements HelixDataAccessor { throw new HelixException("Method not implemented!"); } + @Deprecated @Override public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) { + return getProperty(keys, false); + } + + @Override + public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, + boolean throwException) { List<T> list = new ArrayList<T>(); for (PropertyKey key : keys) { @SuppressWarnings("unchecked") @@ -248,9 +258,4 @@ public class MockAccessor implements HelixDataAccessor { } return list; } - - @Override public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys, - boolean throwException) { - return getProperty(keys); - } } diff --git a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java index 631870e..d766d89 100644 --- a/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java +++ b/helix-core/src/test/java/org/apache/helix/TestRoutingTable.java @@ -61,12 +61,12 @@ public class TestRoutingTable { _mockAccessor = new MockAccessor() { @SuppressWarnings("unchecked") @Override - public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) - { + public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, + boolean throwException) { PropertyType type = key.getType(); String[] keys = key.getParams(); - if (type == PropertyType.CONFIGS && keys != null && keys.length > 1 - && keys[1].equalsIgnoreCase(ConfigScopeProperty.PARTICIPANT.toString())) { + if (type == PropertyType.CONFIGS && keys != null && keys.length > 1 && keys[1] + .equalsIgnoreCase(ConfigScopeProperty.PARTICIPANT.toString())) { List<InstanceConfig> configs = new ArrayList<InstanceConfig>(); for (String instanceName : array) { InstanceConfig config = new InstanceConfig(instanceName); @@ -78,7 +78,7 @@ public class TestRoutingTable { return (List<T>) configs; } return Collections.emptyList(); - }; + } }; } return _mockAccessor; diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestCustomizedViewStage.java new file mode 100644 index 0000000..e69de29 diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java index 96e334b..2dde5c0 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestExternalViewStage.java @@ -66,14 +66,14 @@ public class TestExternalViewStage extends ZkUnitTestBase { runStage(event, new CurrentStateComputationStage()); runStage(event, externalViewComputeStage); Assert.assertEquals(cache.getExternalViews().values(), - accessor.getChildValues(accessor.keyBuilder().externalViews())); + accessor.getChildValues(accessor.keyBuilder().externalViews(), true)); // Assure there is no external got updated List<ExternalView> oldExternalViews = - accessor.getChildValues(accessor.keyBuilder().externalViews()); + accessor.getChildValues(accessor.keyBuilder().externalViews(), true); runStage(event, externalViewComputeStage); List<ExternalView> newExternalViews = - accessor.getChildValues(accessor.keyBuilder().externalViews()); + accessor.getChildValues(accessor.keyBuilder().externalViews(), true); Assert.assertEquals(oldExternalViews, newExternalViews); for (int i = 0; i < oldExternalViews.size(); i++) { Assert.assertEquals(oldExternalViews.get(i).getStat().getVersion(), diff --git a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java index d90c228..2f92541 100644 --- a/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java +++ b/helix-core/src/test/java/org/apache/helix/controller/stages/TestRebalancePipeline.java @@ -177,8 +177,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { // Controller has timeout > 1sec, so after 1s, controller should not have GCed message Thread.sleep(1000); - Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_0")).size(), 1); - Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_1")).size(), 1); + Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_0"), true).size(), 1); + Assert.assertEquals(accessor.getChildValues(keyBuilder.messages("localhost_1"), true).size(), 1); // After another purge delay, controller should cleanup messages and continue to rebalance Thread.sleep(msgPurgeDelay); @@ -187,8 +187,8 @@ public class TestRebalancePipeline extends ZkUnitTestBase { "SLAVE"); Thread.sleep(1000); - List<Message> host0Msg = accessor.getChildValues(keyBuilder.messages("localhost_0")); - List<Message> host1Msg = accessor.getChildValues(keyBuilder.messages("localhost_1")); + List<Message> host0Msg = accessor.getChildValues(keyBuilder.messages("localhost_0"), true); + List<Message> host1Msg = accessor.getChildValues(keyBuilder.messages("localhost_1"), true); List<Message> allMsgs = new ArrayList<>(host0Msg); allMsgs.addAll(host1Msg); Assert.assertEquals(allMsgs.size(), 1); @@ -299,7 +299,7 @@ public class TestRebalancePipeline extends ZkUnitTestBase { // message, make sure controller should not send O->DROPPED until O->S is done HelixAdmin admin = new ZKHelixAdmin(_gZkClient); admin.dropResource(clusterName, resourceName); - List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates()); + List<IdealState> idealStates = accessor.getChildValues(accessor.keyBuilder().idealStates(), true); cache.setIdealStates(idealStates); runPipeline(event, dataRefresh); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java index b36738f..83b977e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestAddStateModelFactoryAfterConnect.java @@ -96,7 +96,7 @@ public class TestAddStateModelFactoryAfterConnect extends ZkTestBase { totalMsgs = 0; for (int i = 0; i < n; i++) { List<Message> msgs = - accessor.getChildValues(keyBuilder.messages(participants[i].getInstanceName())); + accessor.getChildValues(keyBuilder.messages(participants[i].getInstanceName()), true); totalMsgs += msgs.size(); } diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java index 56b5ca0..20aba86 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestMessagePartitionStateMismatch.java @@ -44,7 +44,7 @@ public class TestMessagePartitionStateMismatch extends ZkStandAloneCMTestBase { Builder kb = accessor.keyBuilder(); ExternalView ev = accessor.getProperty(kb.externalView(TEST_DB)); Map<String, LiveInstance> liveinstanceMap = - accessor.getChildValuesMap(accessor.keyBuilder().liveInstances()); + accessor.getChildValuesMap(accessor.keyBuilder().liveInstances(), true); for (String instanceName : liveinstanceMap.keySet()) { String sessionid = liveinstanceMap.get(instanceName).getEphemeralOwner(); diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java index 50d789e..3b597d9 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestNoThrottleDisabledPartitions.java @@ -89,7 +89,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { if (liveInstance != null) { String sessionId = liveInstance.getEphemeralOwner(); List<CurrentState> currentStates = _accessor.getChildValues( - _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId)); + _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId), + true); for (CurrentState currentState : currentStates) { for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap() .entrySet()) { @@ -143,7 +144,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { if (liveInstance != null) { String sessionId = liveInstance.getEphemeralOwner(); List<CurrentState> currentStates = _accessor.getChildValues( - _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId)); + _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId), + true); for (CurrentState currentState : currentStates) { for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap() .entrySet()) { @@ -202,7 +204,8 @@ public class TestNoThrottleDisabledPartitions extends ZkTestBase { if (liveInstance != null) { String sessionId = liveInstance.getEphemeralOwner(); List<CurrentState> currentStates = _accessor.getChildValues( - _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId)); + _accessor.keyBuilder().currentStates(_participants[0].getInstanceName(), sessionId), + true); for (CurrentState currentState : currentStates) { for (Map.Entry<String, String> partitionState : currentState.getPartitionStateMap() .entrySet()) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java index b7a6eb9..c933c0d 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestStatusUpdate.java @@ -40,7 +40,7 @@ public class TestStatusUpdate extends ZkStandAloneCMTestBase { new ZKHelixDataAccessor(CLUSTER_NAME, new ZkBaseDataAccessor(_gZkClient)); Builder keyBuilder = accessor.keyBuilder(); - List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews()); + List<ExternalView> extViews = accessor.getChildValues(keyBuilder.externalViews(), true); Assert.assertNotNull(extViews); for (ExternalView extView : extViews) { diff --git a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java index e9d6d8a..8f69bff 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java +++ b/helix-core/src/test/java/org/apache/helix/integration/controller/TestTargetExternalView.java @@ -67,8 +67,9 @@ public class TestTargetExternalView extends TaskTestBase { _accessor.getChildNames(_accessor.keyBuilder().targetExternalViews()).size(), 3); List<ExternalView> targetExternalViews = - _accessor.getChildValues(_accessor.keyBuilder().externalViews()); - List<IdealState> idealStates = _accessor.getChildValues(_accessor.keyBuilder().idealStates()); + _accessor.getChildValues(_accessor.keyBuilder().externalViews(), true); + List<IdealState> idealStates = + _accessor.getChildValues(_accessor.keyBuilder().idealStates(), true); for (int i = 0; i < idealStates.size(); i++) { Assert.assertEquals(targetExternalViews.get(i).getRecord().getMapFields(), @@ -83,8 +84,8 @@ public class TestTargetExternalView extends TaskTestBase { Assert.assertTrue(verifier.verifyByPolling()); Thread.sleep(1000); - targetExternalViews = _accessor.getChildValues(_accessor.keyBuilder().externalViews()); - idealStates = _accessor.getChildValues(_accessor.keyBuilder().idealStates()); + targetExternalViews = _accessor.getChildValues(_accessor.keyBuilder().externalViews(), true); + idealStates = _accessor.getChildValues(_accessor.keyBuilder().idealStates(), true); for (int i = 0; i < idealStates.size(); i++) { Assert.assertEquals(targetExternalViews.get(i).getRecord().getMapFields(), diff --git a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle.java b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle.java index a1f71dc..b66483e 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle.java +++ b/helix-core/src/test/java/org/apache/helix/integration/messaging/TestMessageThrottle.java @@ -93,7 +93,7 @@ public class TestMessageThrottle extends ZkTestBase { throws Exception { if (currentChilds != null && currentChilds.size() > 1) { List<ZNRecord> records = - accessor.getBaseDataAccessor().getChildren(parentPath, null, 0); + accessor.getBaseDataAccessor().getChildren(parentPath, null, 0, 0, 0); int transitionMsgCount = 0; for (ZNRecord record : records) { Message msg = new Message(record); diff --git a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java index 170c55a..9f6be04 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java +++ b/helix-core/src/test/java/org/apache/helix/integration/task/TaskTestUtil.java @@ -342,7 +342,7 @@ public class TaskTestUtil { long startTime = System.currentTimeMillis(); while (true) { - List<Message> messages = accessor.getChildValues(propertyKey); + List<Message> messages = accessor.getChildValues(propertyKey, true); if (allTasksBlock(messages, numTask)) { return true; } else if (startTime + timeout < System.currentTimeMillis()) { diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java index 0c2c8b5..e38c2a5 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestWtCacheAsyncOpSingleThread.java @@ -131,14 +131,14 @@ public class TestWtCacheAsyncOpSingleThread extends ZkUnitTestBase { paths.add(path); } - records = accessor.get(paths, null, 0); + records = accessor.get(paths, null, 0, true); for (int i = 0; i < 10; i++) { Assert.assertEquals(records.get(i).getId(), "TestDB" + i); } // getChildren records.clear(); - records = accessor.getChildren(extViewPath, null, 0); + records = accessor.getChildren(extViewPath, null, 0, 0, 0); for (int i = 0; i < 10; i++) { Assert.assertEquals(records.get(i).getId(), "TestDB" + i); } diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java index 671ce80..1254377 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkBaseDataAccessor.java @@ -457,7 +457,7 @@ public class TestZkBaseDataAccessor extends ZkUnitTestBase { // test async getChildren String parentPath = PropertyPathBuilder.instanceMessage(root, "host_1"); - records = accessor.getChildren(parentPath, null, 0); + records = accessor.getChildren(parentPath, null, 0, 0, 0); for (int i = 0; i < 10; i++) { String msgId = "msg_" + i; ZNRecord record = records.get(i); diff --git a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java index 48e0998..11ebdfe 100644 --- a/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java +++ b/helix-core/src/test/java/org/apache/helix/manager/zk/TestZkCacheAsyncOpSingleThread.java @@ -279,14 +279,14 @@ public class TestZkCacheAsyncOpSingleThread extends ZkUnitTestBase { paths.add(path); } - records = accessor.get(paths, null, 0); + records = accessor.get(paths, null, 0, true); for (int i = 0; i < 10; i++) { Assert.assertEquals(records.get(i).getId(), "TestDB" + i); } // getChildren records.clear(); - records = accessor.getChildren(extViewPath, null, 0); + records = accessor.getChildren(extViewPath, null, 0, 0, 0); for (int i = 0; i < 10; i++) { Assert.assertEquals(records.get(i).getId(), "TestDB" + i); } diff --git a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java index 4fbf891..9df79e4 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/TestDefaultMessagingService.java @@ -62,7 +62,8 @@ public class TestDefaultMessagingService { } @Override - public <T extends HelixProperty> List<T> getChildValues(PropertyKey key) { + public <T extends HelixProperty> List<T> getChildValues(PropertyKey key, + boolean throwException) { PropertyType type = key.getType(); List<T> result = new ArrayList<T>(); Class<? extends HelixProperty> clazz = key.getTypeClass(); diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java index 207f74c..7eadfc0 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/MockHelixTaskExecutor.java @@ -55,7 +55,7 @@ public class MockHelixTaskExecutor extends HelixTaskExecutor { HelixDataAccessor accessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = accessor.keyBuilder(); PropertyKey path = keyBuilder.currentStates(manager.getInstanceName(), manager.getSessionId()); - Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path); + Map<String, CurrentState> currentStateMap = accessor.getChildValuesMap(path, true); Set<String> seenPartitions = new HashSet<>(); for (Message message : messages) { diff --git a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java index 23bab5d..be00740 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/handling/TestHelixTaskExecutor.java @@ -345,7 +345,8 @@ public class TestHelixTaskExecutor { } AssertJUnit - .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), nMsgs); + .assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), + nMsgs); changeContext.setChangeType(HelixConstants.ChangeType.MESSAGE); executor.onMessage(instanceName, msgList, changeContext); @@ -353,7 +354,8 @@ public class TestHelixTaskExecutor { Thread.sleep(200); // only 1 message is left over - state transition takes 1sec - Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1); + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), + 1); // While a state transition message is going on, another state transition message for same // resource / partition comes in, it should be discarded by message handler @@ -363,10 +365,12 @@ public class TestHelixTaskExecutor { dataAccessor.setProperty(msgList.get(2).getKey(keyBuilder, instanceName), msgList.get(2)); executor.onMessage(instanceName, Arrays.asList(msgList.get(2)), changeContext); Thread.sleep(200); - Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 1); + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), + 1); Thread.sleep(1000); - Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName)).size(), 0); + Assert.assertEquals(dataAccessor.getChildValues(keyBuilder.messages(instanceName), true).size(), + 0); System.out.println("END TestHelixTaskExecutor.testDuplicatedMessage()"); } diff --git a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java index 72f110c..bf57b85 100644 --- a/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java +++ b/helix-core/src/test/java/org/apache/helix/messaging/p2pMessage/TestP2PMessages.java @@ -306,7 +306,7 @@ public class TestP2PMessages extends BaseStageTest { */ private void handleMessage(String instance, String resource) { PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance); - List<Message> messages = accessor.getChildValues(propertyKey); + List<Message> messages = accessor.getChildValues(propertyKey, true); String session = _dataCache.getLiveInstances().get(instance).getEphemeralOwner(); for (Message m : messages) { @@ -391,7 +391,7 @@ public class TestP2PMessages extends BaseStageTest { private void cleanMessages(String instance) { PropertyKey propertyKey = new PropertyKey.Builder(_clusterName).messages(instance); - List<Message> messages = accessor.getChildValues(propertyKey); + List<Message> messages = accessor.getChildValues(propertyKey, true); for (Message m : messages) { accessor .removeProperty(new PropertyKey.Builder(_clusterName).message(instance, m.getMsgId())); @@ -399,7 +399,7 @@ public class TestP2PMessages extends BaseStageTest { } List<Message> getMessages(String instance) { - return accessor.getChildValues(new PropertyKey.Builder(_clusterName).messages(instance)); + return accessor.getChildValues(new PropertyKey.Builder(_clusterName).messages(instance), true); } private String getTopStateInstance(Map<String, String> instanceStateMap, String topState) { diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java index df468b3..93b7997 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockBaseDataAccessor.java @@ -148,6 +148,7 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { return zNode != null ? zNode.getRecord() : null; } + @Deprecated @Override public List<ZNRecord> get(List<String> paths, List<Stat> stats, int options) { return get(paths, stats, options, false); @@ -164,8 +165,15 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { return records; } + @Deprecated @Override public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options) { + return getChildren(parentPath, stats, options, 0, 0); + } + + @Override + public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options, + int retryCount, int retryInterval) throws HelixException { List<ZNRecord> children = new ArrayList<>(); for (String key : _recordMap.keySet()) { if (key.startsWith(parentPath)) { @@ -187,12 +195,6 @@ public class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { } @Override - public List<ZNRecord> getChildren(String parentPath, List<Stat> stats, int options, - int retryCount, int retryInterval) throws HelixException { - return getChildren(parentPath, stats, options); - } - - @Override public List<String> getChildNames(String parentPath, int options) { List<String> child = new ArrayList<>(); for (String key : _recordMap.keySet()) { diff --git a/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java b/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java index 0ea33c7..0584d63 100644 --- a/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java +++ b/helix-core/src/test/java/org/apache/helix/mock/MockZkHelixDataAccessor.java @@ -17,13 +17,10 @@ public class MockZkHelixDataAccessor extends ZKHelixDataAccessor { super(clusterName, null, baseDataAccessor); } - + @Deprecated @Override public <T extends HelixProperty> List<T> getProperty(List<PropertyKey> keys) { - for (PropertyKey key : keys) { - addCount(key); - } - return super.getProperty(keys); + return getProperty(keys, false); } @Override @@ -40,11 +37,10 @@ public class MockZkHelixDataAccessor extends ZKHelixDataAccessor { return super.getProperty(key); } + @Deprecated @Override public <T extends HelixProperty> Map<String, T> getChildValuesMap(PropertyKey key) { - Map<String, T> map = super.getChildValuesMap(key); - addCount(key, map.keySet().size()); - return map; + return getChildValuesMap(key, false); } @Override diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java index 19aa70c..ac02b3a 100644 --- a/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java +++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java @@ -446,7 +446,7 @@ public class TestAutoFallbackPropertyStore extends ZkUnitTestBase { } // test multi-get - List<ZNRecord> records = store.get(paths, null, 0); + List<ZNRecord> records = store.get(paths, null, 0, true); Assert.assertNotNull(records); Assert.assertEquals(records.size(), 10); for (int i = 0; i < 10; i++) { @@ -554,7 +554,7 @@ public class TestAutoFallbackPropertyStore extends ZkUnitTestBase { } // test multi-get - List<ZNRecord> records = store.get(paths, null, 0); + List<ZNRecord> records = store.get(paths, null, 0, true); Assert.assertNotNull(records); Assert.assertEquals(records.size(), 20); for (int i = 0; i < 20; i++) { @@ -627,7 +627,7 @@ public class TestAutoFallbackPropertyStore extends ZkUnitTestBase { Assert.assertTrue(childs.contains(child)); } - List<ZNRecord> records = store.getChildren("/", null, 0); + List<ZNRecord> records = store.getChildren("/", null, 0, 0, 0); Assert.assertNotNull(records); Assert.assertEquals(records.size(), 20); for (int i = 0; i < 20; i++) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java index 9f20a02..3bc9a82 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/ClusterServiceImpl.java @@ -29,7 +29,7 @@ public class ClusterServiceImpl implements ClusterService { String zoneField = _configAccessor.getClusterConfig(cluster).getFaultZoneType(); PropertyKey.Builder keyBuilder = _dataAccessor.keyBuilder(); List<InstanceConfig> instanceConfigs = - _dataAccessor.getChildValues(keyBuilder.instanceConfigs()); + _dataAccessor.getChildValues(keyBuilder.instanceConfigs(), true); Map<String, List<ClusterTopology.Instance>> instanceMapByZone = new HashMap<>(); if (instanceConfigs != null && !instanceConfigs.isEmpty()) { for (InstanceConfig instanceConfig : instanceConfigs) { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java index 9d855d8..a4580fc 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/service/InstanceServiceImpl.java @@ -234,13 +234,15 @@ public class InstanceServiceImpl implements InstanceService { Map<String, Map<String, Boolean>> allPartitionsHealthOnLiveInstance = _dataAccessor.getAllPartitionsHealthOnLiveInstance(restConfig, customPayLoads); List<ExternalView> externalViews = - _dataAccessor.getChildValues(_dataAccessor.keyBuilder().externalViews()); + _dataAccessor.getChildValues(_dataAccessor.keyBuilder().externalViews(), true); Map<String, StoppableCheck> instanceStoppableChecks = new HashMap<>(); for (String instanceName : instances) { - List<String> unHealthyPartitions = InstanceValidationUtil.perPartitionHealthCheck( - externalViews, allPartitionsHealthOnLiveInstance, instanceName, _dataAccessor); - StoppableCheck stoppableCheck = new StoppableCheck(unHealthyPartitions.isEmpty(), - unHealthyPartitions, StoppableCheck.Category.CUSTOM_PARTITION_CHECK); + List<String> unHealthyPartitions = InstanceValidationUtil + .perPartitionHealthCheck(externalViews, allPartitionsHealthOnLiveInstance, instanceName, + _dataAccessor); + StoppableCheck stoppableCheck = + new StoppableCheck(unHealthyPartitions.isEmpty(), unHealthyPartitions, + StoppableCheck.Category.CUSTOM_PARTITION_CHECK); instanceStoppableChecks.put(instanceName, stoppableCheck); } diff --git a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java index a988326..f801a83 100644 --- a/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java +++ b/helix-rest/src/test/java/org/apache/helix/rest/server/service/TestClusterService.java @@ -30,7 +30,7 @@ public class TestClusterService { Mock mock = new Mock(); when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); - when(mock.dataAccessor.getChildValues(any(PropertyKey.class))).thenReturn(instanceConfigs); + when(mock.dataAccessor.getChildValues(any(PropertyKey.class), anyBoolean())).thenReturn(instanceConfigs); ClusterTopology clusterTopology = mock.clusterService.getClusterTopology(TEST_CLUSTER); @@ -46,7 +46,8 @@ public class TestClusterService { Mock mock = new Mock(); when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); - when(mock.dataAccessor.getChildValues(any(PropertyKey.class))).thenReturn(instanceConfigs); + when(mock.dataAccessor.getChildValues(any(PropertyKey.class), anyBoolean())) + .thenReturn(instanceConfigs); ClusterTopology clusterTopology = mock.clusterService.getClusterTopology(TEST_CLUSTER); @@ -64,7 +65,8 @@ public class TestClusterService { Mock mock = new Mock(); when(mock.dataAccessor.keyBuilder()).thenReturn(new PropertyKey.Builder(TEST_CLUSTER)); - when(mock.dataAccessor.getChildValues(any(PropertyKey.class))).thenReturn(instanceConfigs); + when(mock.dataAccessor.getChildValues(any(PropertyKey.class), anyBoolean())) + .thenReturn(instanceConfigs); ClusterTopology clusterTopology = mock.clusterService.getClusterTopology(TEST_CLUSTER);
