This is an automated email from the ASF dual-hosted git repository. jiajunwang pushed a commit to branch wagedRebalancer in repository https://gitbox.apache.org/repos/asf/helix.git
commit afa297fcae31abc07d5a1186bc5bd0f02a5b4f95 Author: Hunter Lee <[email protected]> AuthorDate: Thu Aug 15 14:33:02 2019 -0700 Add ChangeDetector interface and ResourceChangeDetector implementation (#388) Add ChangeDetector interface and ResourceChangeDetector implementation In order to efficiently react to changes happening to the cluster in the new WAGED rebalancer, a new component called ChangeDetector was added. Changelist: 1. Add ChangeDetector interface 2. Implement ResourceChangeDetector 3. Add ResourceChangeCache, a wrapper for critical cluster metadata 4. Add an integration test, TestResourceChangeDetector --- .../controller/changedetector/ChangeDetector.java | 57 ++++ .../changedetector/ResourceChangeDetector.java | 158 +++++++++ .../changedetector/ResourceChangeSnapshot.java | 105 ++++++ .../ResourceControllerDataProvider.java | 33 +- .../changedetector/TestResourceChangeDetector.java | 357 +++++++++++++++++++++ 5 files changed, 705 insertions(+), 5 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java new file mode 100644 index 0000000..fbe4afc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ChangeDetector.java @@ -0,0 +1,57 @@ +package org.apache.helix.controller.changedetector; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import org.apache.helix.HelixConstants; + +/** + * ChangeDetector interface that will be used to track deltas in the cluster from one pipeline run + * to another. The interface methods are designed to be flexible for both the resource pipeline and + * the task pipeline. + * TODO: Consider splitting this up into two different ChangeDetector interfaces: + * TODO: PropertyBasedChangeDetector and PathBasedChangeDetector. + */ +public interface ChangeDetector { + + /** + * Returns all types of changes detected. + * @return a collection of ChangeTypes + */ + Collection<HelixConstants.ChangeType> getChangeTypes(); + + /** + * Returns the names of items that changed based on the change type given. + * @return a collection of names of items that changed + */ + Collection<String> getChangesByType(HelixConstants.ChangeType changeType); + + /** + * Returns the names of items that were added based on the change type given. + * @return a collection of names of items that were added + */ + Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType); + + /** + * Returns the names of items that were removed based on the change type given. + * @return a collection of names of items that were removed + */ + Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType); +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java new file mode 100644 index 0000000..d65e609 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeDetector.java @@ -0,0 +1,158 @@ +package org.apache.helix.controller.changedetector; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import com.google.common.collect.Sets; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import org.apache.helix.HelixConstants; +import org.apache.helix.HelixException; +import org.apache.helix.HelixProperty; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; + +/** + * ResourceChangeDetector implements ChangeDetector. It caches resource-related metadata from + * Helix's main resource pipeline cache (DataProvider) and the computation results of change + * detection. + * WARNING: the methods of this class are not thread-safe. + */ +public class ResourceChangeDetector implements ChangeDetector { + + private ResourceChangeSnapshot _oldSnapshot; // snapshot for previous pipeline run + private ResourceChangeSnapshot _newSnapshot; // snapshot for this pipeline run + + // The following caches the computation results + private Map<HelixConstants.ChangeType, Collection<String>> _changedItems = new HashMap<>(); + private Map<HelixConstants.ChangeType, Collection<String>> _addedItems = new HashMap<>(); + private Map<HelixConstants.ChangeType, Collection<String>> _removedItems = new HashMap<>(); + + public ResourceChangeDetector() { + _newSnapshot = new ResourceChangeSnapshot(); + } + + /** + * Compare the underlying HelixProperty objects and produce a collection of names of changed + * properties. + * @return + */ + private Collection<String> getChangedItems(Map<String, ? extends HelixProperty> oldPropertyMap, + Map<String, ? extends HelixProperty> newPropertyMap) { + Collection<String> changedItems = new HashSet<>(); + oldPropertyMap.forEach((name, property) -> { + if (newPropertyMap.containsKey(name) + && !property.getRecord().equals(newPropertyMap.get(name).getRecord())) { + changedItems.add(name); + } + }); + return changedItems; + } + + /** + * Return a collection of names that are newly added. + * @return + */ + private Collection<String> getAddedItems(Map<String, ? extends HelixProperty> oldPropertyMap, + Map<String, ? extends HelixProperty> newPropertyMap) { + return Sets.difference(newPropertyMap.keySet(), oldPropertyMap.keySet()); + } + + /** + * Return a collection of names that were removed. + * @return + */ + private Collection<String> getRemovedItems(Map<String, ? extends HelixProperty> oldPropertyMap, + Map<String, ? extends HelixProperty> newPropertyMap) { + return Sets.difference(oldPropertyMap.keySet(), newPropertyMap.keySet()); + } + + private void clearCachedComputation() { + _changedItems.clear(); + _addedItems.clear(); + _removedItems.clear(); + } + + /** + * Based on the change type given and propertyMap type, call the right getters for propertyMap. + * @param changeType + * @param snapshot + * @return + */ + private Map<String, ? extends HelixProperty> determinePropertyMapByType( + HelixConstants.ChangeType changeType, ResourceChangeSnapshot snapshot) { + switch (changeType) { + case INSTANCE_CONFIG: + return snapshot.getInstanceConfigMap(); + case IDEAL_STATE: + return snapshot.getIdealStateMap(); + case RESOURCE_CONFIG: + return snapshot.getResourceConfigMap(); + case LIVE_INSTANCE: + return snapshot.getLiveInstances(); + default: + throw new HelixException(String.format( + "ResourceChangeDetector cannot compute the names of changes for the given ChangeType: %s", + changeType)); + } + } + + /** + * Makes the current newSnapshot the oldSnapshot and reads in the up-to-date snapshot for change + * computation. To be called in the controller pipeline. + * @param dataProvider newly refreshed DataProvider (cache) + */ + public synchronized void updateSnapshots(ResourceControllerDataProvider dataProvider) { + // If there are changes, update internal states + _oldSnapshot = new ResourceChangeSnapshot(_newSnapshot); + _newSnapshot = new ResourceChangeSnapshot(dataProvider); + dataProvider.clearRefreshedChangeTypes(); + + // Invalidate cached computation + clearCachedComputation(); + } + + @Override + public Collection<HelixConstants.ChangeType> getChangeTypes() { + return Collections.unmodifiableSet(_newSnapshot.getChangedTypes()); + } + + @Override + public Collection<String> getChangesByType(HelixConstants.ChangeType changeType) { + return _changedItems.computeIfAbsent(changeType, + changedItems -> getChangedItems(determinePropertyMapByType(changeType, _oldSnapshot), + determinePropertyMapByType(changeType, _newSnapshot))); + } + + @Override + public Collection<String> getAdditionsByType(HelixConstants.ChangeType changeType) { + return _addedItems.computeIfAbsent(changeType, + changedItems -> getAddedItems(determinePropertyMapByType(changeType, _oldSnapshot), + determinePropertyMapByType(changeType, _newSnapshot))); + } + + @Override + public Collection<String> getRemovalsByType(HelixConstants.ChangeType changeType) { + return _removedItems.computeIfAbsent(changeType, + changedItems -> getRemovedItems(determinePropertyMapByType(changeType, _oldSnapshot), + determinePropertyMapByType(changeType, _newSnapshot))); + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java new file mode 100644 index 0000000..cbc3539 --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/controller/changedetector/ResourceChangeSnapshot.java @@ -0,0 +1,105 @@ +package org.apache.helix.controller.changedetector; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import org.apache.helix.HelixConstants; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.model.IdealState; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.ResourceConfig; + +/** + * ResourceChangeSnapshot is a POJO that contains the following Helix metadata: + * 1. InstanceConfig + * 2. IdealState + * 3. ResourceConfig + * 4. LiveInstance + * 5. Changed property types + * It serves as a snapshot of the main controller cache to enable the difference (change) + * calculation between two rounds of the pipeline run. + */ +class ResourceChangeSnapshot { + + private Set<HelixConstants.ChangeType> _changedTypes; + private Map<String, InstanceConfig> _instanceConfigMap; + private Map<String, IdealState> _idealStateMap; + private Map<String, ResourceConfig> _resourceConfigMap; + private Map<String, LiveInstance> _liveInstances; + + /** + * Default constructor that constructs an empty snapshot. + */ + ResourceChangeSnapshot() { + _changedTypes = new HashSet<>(); + _instanceConfigMap = new HashMap<>(); + _idealStateMap = new HashMap<>(); + _resourceConfigMap = new HashMap<>(); + _liveInstances = new HashMap<>(); + } + + /** + * Constructor using controller cache (ResourceControllerDataProvider). + * @param dataProvider + */ + ResourceChangeSnapshot(ResourceControllerDataProvider dataProvider) { + _changedTypes = new HashSet<>(dataProvider.getRefreshedChangeTypes()); + _instanceConfigMap = new HashMap<>(dataProvider.getInstanceConfigMap()); + _idealStateMap = new HashMap<>(dataProvider.getIdealStates()); + _resourceConfigMap = new HashMap<>(dataProvider.getResourceConfigMap()); + _liveInstances = new HashMap<>(dataProvider.getLiveInstances()); + } + + /** + * Copy constructor for ResourceChangeCache. + * @param cache + */ + ResourceChangeSnapshot(ResourceChangeSnapshot cache) { + _changedTypes = new HashSet<>(cache._changedTypes); + _instanceConfigMap = new HashMap<>(cache._instanceConfigMap); + _idealStateMap = new HashMap<>(cache._idealStateMap); + _resourceConfigMap = new HashMap<>(cache._resourceConfigMap); + _liveInstances = new HashMap<>(cache._liveInstances); + } + + Set<HelixConstants.ChangeType> getChangedTypes() { + return _changedTypes; + } + + Map<String, InstanceConfig> getInstanceConfigMap() { + return _instanceConfigMap; + } + + Map<String, IdealState> getIdealStateMap() { + return _idealStateMap; + } + + Map<String, ResourceConfig> getResourceConfigMap() { + return _resourceConfigMap; + } + + Map<String, LiveInstance> getLiveInstances() { + return _liveInstances; + } +} diff --git a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java index b1dc215..9e1550a 100644 --- a/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java +++ b/helix-core/src/main/java/org/apache/helix/controller/dataproviders/ResourceControllerDataProvider.java @@ -25,6 +25,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; import org.apache.helix.HelixConstants; import org.apache.helix.HelixDataAccessor; import org.apache.helix.PropertyKey; @@ -64,6 +65,9 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { private Map<String, Map<String, MissingTopStateRecord>> _missingTopStateMap; private Map<String, Map<String, String>> _lastTopStateLocationMap; + // Maintain a set of all ChangeTypes for change detection + private Set<HelixConstants.ChangeType> _refreshedChangeTypes; + public ResourceControllerDataProvider() { this(AbstractDataCache.UNKNOWN_CLUSTER); } @@ -106,19 +110,21 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { _idealMappingCache = new HashMap<>(); _missingTopStateMap = new HashMap<>(); _lastTopStateLocationMap = new HashMap<>(); + _refreshedChangeTypes = ConcurrentHashMap.newKeySet(); } public synchronized void refresh(HelixDataAccessor accessor) { long startTime = System.currentTimeMillis(); // Refresh base - Set<HelixConstants.ChangeType> propertyRefreshed = super.doRefresh(accessor); + Set<HelixConstants.ChangeType> changedTypes = super.doRefresh(accessor); + _refreshedChangeTypes.addAll(changedTypes); // Invalidate cached information if any of the important data has been refreshed - if (propertyRefreshed.contains(HelixConstants.ChangeType.IDEAL_STATE) - || propertyRefreshed.contains(HelixConstants.ChangeType.LIVE_INSTANCE) - || propertyRefreshed.contains(HelixConstants.ChangeType.INSTANCE_CONFIG) - || propertyRefreshed.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) { + if (changedTypes.contains(HelixConstants.ChangeType.IDEAL_STATE) + || changedTypes.contains(HelixConstants.ChangeType.LIVE_INSTANCE) + || changedTypes.contains(HelixConstants.ChangeType.INSTANCE_CONFIG) + || changedTypes.contains(HelixConstants.ChangeType.RESOURCE_CONFIG)) { clearCachedResourceAssignments(); } @@ -261,6 +267,23 @@ public class ResourceControllerDataProvider extends BaseControllerDataProvider { _idealMappingCache.put(resource, mapping); } + /** + * Return the set of all PropertyTypes that changed prior to this round of rebalance. The caller + * should clear this set by calling {@link #clearRefreshedChangeTypes()}. + * @return + */ + public Set<HelixConstants.ChangeType> getRefreshedChangeTypes() { + return _refreshedChangeTypes; + } + + /** + * Clears the set of all PropertyTypes that changed. The caller will have consumed all change + * types by calling {@link #getRefreshedChangeTypes()}. + */ + public void clearRefreshedChangeTypes() { + _refreshedChangeTypes.clear(); + } + public void clearCachedResourceAssignments() { _resourceAssignmentCache.clear(); _idealMappingCache.clear(); diff --git a/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java new file mode 100644 index 0000000..3ef41e4 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/controller/changedetector/TestResourceChangeDetector.java @@ -0,0 +1,357 @@ +package org.apache.helix.controller.changedetector; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.Collection; +import org.apache.helix.AccessOption; +import org.apache.helix.HelixConstants.ChangeType; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.PropertyKey; +import org.apache.helix.TestHelper; +import org.apache.helix.common.ZkTestBase; +import org.apache.helix.controller.dataproviders.ResourceControllerDataProvider; +import org.apache.helix.integration.manager.ClusterControllerManager; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZKHelixDataAccessor; +import org.apache.helix.model.ClusterConfig; +import org.apache.helix.model.InstanceConfig; +import org.apache.helix.model.ResourceConfig; +import org.testng.Assert; +import org.testng.annotations.AfterClass; +import org.testng.annotations.BeforeClass; +import org.testng.annotations.Test; + +/** + * This test contains a series of unit tests for ResourceChangeDetector. + */ +public class TestResourceChangeDetector extends ZkTestBase { + + // All possible change types for ResourceChangeDetector except for ClusterConfig + // since we don't provide the names of changed fields for ClusterConfig + private static final ChangeType[] RESOURCE_CHANGE_TYPES = { + ChangeType.IDEAL_STATE, ChangeType.INSTANCE_CONFIG, ChangeType.LIVE_INSTANCE, + ChangeType.RESOURCE_CONFIG + }; + + private static final String CLUSTER_NAME = TestHelper.getTestClassName(); + private static final String RESOURCE_NAME = "TestDB"; + private static final String NEW_RESOURCE_NAME = "TestDB2"; + private static final String STATE_MODEL = "MasterSlave"; + // There are 5 possible change types for ResourceChangeDetector + private static final int NUM_CHANGE_TYPES = 5; + private static final int NUM_RESOURCES = 1; + private static final int NUM_PARTITIONS = 10; + private static final int NUM_REPLICAS = 3; + private static final int NUM_NODES = 5; + + // Create a mock of ResourceControllerDataProvider so that we could manipulate it + private ResourceControllerDataProvider _dataProvider; + private ResourceChangeDetector _resourceChangeDetector; + private ClusterControllerManager _controller; + private MockParticipantManager[] _participants = new MockParticipantManager[NUM_NODES]; + private HelixDataAccessor _dataAccessor; + private PropertyKey.Builder _keyBuilder; + + @BeforeClass + public void beforeClass() throws Exception { + super.beforeClass(); + + // Set up a mock cluster + TestHelper.setupCluster(CLUSTER_NAME, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + RESOURCE_NAME, // resource name prefix + NUM_RESOURCES, // resources + NUM_PARTITIONS, // partitions per resource + NUM_NODES, // nodes + NUM_REPLICAS, // replicas + STATE_MODEL, true); // do rebalance + + // Start a controller + _controller = new ClusterControllerManager(ZK_ADDR, CLUSTER_NAME, "controller_0"); + _controller.syncStart(); + + // Start Participants + for (int i = 0; i < NUM_NODES; i++) { + String instanceName = "localhost_" + (12918 + i); + _participants[i] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, instanceName); + _participants[i].syncStart(); + } + + _dataAccessor = new ZKHelixDataAccessor(CLUSTER_NAME, _baseAccessor); + _keyBuilder = _dataAccessor.keyBuilder(); + _resourceChangeDetector = new ResourceChangeDetector(); + + // Create a custom data provider + _dataProvider = new ResourceControllerDataProvider(CLUSTER_NAME); + } + + @AfterClass + public void afterClass() throws Exception { + for (MockParticipantManager participant : _participants) { + if (participant != null && participant.isConnected()) { + participant.syncStop(); + } + } + _controller.syncStop(); + deleteCluster(CLUSTER_NAME); + Assert.assertFalse(TestHelper.verify(() -> _dataAccessor.getBaseDataAccessor() + .exists("/" + CLUSTER_NAME, AccessOption.PERSISTENT), 20000L)); + } + + /** + * Tests the initialization of the change detector. It should tell us that there's been changes + * for every change type and for all items per type. + * @throws Exception + */ + @Test + public void testResourceChangeDetectorInit() throws Exception { + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + Collection<ChangeType> changeTypes = _resourceChangeDetector.getChangeTypes(); + Assert.assertEquals(changeTypes.size(), NUM_CHANGE_TYPES, + "Not all change types have been detected for ResourceChangeDetector!"); + + // Check that the right amount of resources show up as added + checkDetectionCounts(ChangeType.IDEAL_STATE, NUM_RESOURCES, 0, 0); + + // Check that the right amount of instances show up as added + checkDetectionCounts(ChangeType.LIVE_INSTANCE, NUM_NODES, 0, 0); + checkDetectionCounts(ChangeType.INSTANCE_CONFIG, NUM_NODES, 0, 0); + } + + /** + * Add a resource (IS and ResourceConfig) and see if the detector detects it. + */ + @Test(dependsOnMethods = "testResourceChangeDetectorInit") + public void testAddResource() { + // Create an IS and ResourceConfig + _gSetupTool.getClusterManagementTool().addResource(CLUSTER_NAME, NEW_RESOURCE_NAME, + NUM_PARTITIONS, STATE_MODEL); + ResourceConfig resourceConfig = new ResourceConfig(NEW_RESOURCE_NAME); + _dataAccessor.setProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig); + // Manually notify dataProvider + _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE); + _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG); + + // Refresh the data provider + _dataProvider.refresh(_dataAccessor); + + // Update the detector + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.IDEAL_STATE, ChangeType.RESOURCE_CONFIG); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) { + checkDetectionCounts(type, 1, 0, 0); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + // Check that detector gives the right item + Assert.assertTrue(_resourceChangeDetector.getAdditionsByType(ChangeType.RESOURCE_CONFIG) + .contains(NEW_RESOURCE_NAME)); + } + + /** + * Modify a resource config for the new resource and test that detector detects it. + */ + @Test(dependsOnMethods = "testAddResource") + public void testModifyResource() { + // Modify resource config + ResourceConfig resourceConfig = + _dataAccessor.getProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME)); + resourceConfig.getRecord().setSimpleField("Did I change?", "Yes!"); + _dataAccessor.updateProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME), resourceConfig); + + // Notify data provider and check + _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.RESOURCE_CONFIG); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.RESOURCE_CONFIG) { + checkDetectionCounts(type, 0, 1, 0); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + Assert.assertTrue(_resourceChangeDetector.getChangesByType(ChangeType.RESOURCE_CONFIG) + .contains(NEW_RESOURCE_NAME)); + } + + /** + * Delete the new resource and test that detector detects it. + */ + @Test(dependsOnMethods = "testModifyResource") + public void testDeleteResource() { + // Delete the newly added resource + _dataAccessor.removeProperty(_keyBuilder.idealStates(NEW_RESOURCE_NAME)); + _dataAccessor.removeProperty(_keyBuilder.resourceConfig(NEW_RESOURCE_NAME)); + + // Notify data provider and check + _dataProvider.notifyDataChange(ChangeType.IDEAL_STATE); + _dataProvider.notifyDataChange(ChangeType.RESOURCE_CONFIG); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.RESOURCE_CONFIG, ChangeType.IDEAL_STATE); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.IDEAL_STATE || type == ChangeType.RESOURCE_CONFIG) { + checkDetectionCounts(type, 0, 0, 1); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + } + + /** + * Disconnect and reconnect a Participant and see if detector detects. + */ + @Test(dependsOnMethods = "testDeleteResource") + public void testDisconnectReconnectInstance() { + // Disconnect a Participant + _participants[0].syncStop(); + _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.LIVE_INSTANCE); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.LIVE_INSTANCE) { + checkDetectionCounts(type, 0, 0, 1); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + + // Reconnect the Participant + _participants[0] = new MockParticipantManager(ZK_ADDR, CLUSTER_NAME, "localhost_12918"); + _participants[0].syncStart(); + _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.LIVE_INSTANCE); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.LIVE_INSTANCE) { + checkDetectionCounts(type, 1, 0, 0); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + } + + /** + * Remove an instance completely and see if detector detects. + */ + @Test(dependsOnMethods = "testDisconnectReconnectInstance") + public void testRemoveInstance() { + _participants[0].syncStop(); + InstanceConfig instanceConfig = + _dataAccessor.getProperty(_keyBuilder.instanceConfig(_participants[0].getInstanceName())); + _gSetupTool.getClusterManagementTool().dropInstance(CLUSTER_NAME, instanceConfig); + + _dataProvider.notifyDataChange(ChangeType.LIVE_INSTANCE); + _dataProvider.notifyDataChange(ChangeType.INSTANCE_CONFIG); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.LIVE_INSTANCE, ChangeType.INSTANCE_CONFIG); + // Check the counts + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + if (type == ChangeType.LIVE_INSTANCE || type == ChangeType.INSTANCE_CONFIG) { + checkDetectionCounts(type, 0, 0, 1); + } else { + checkDetectionCounts(type, 0, 0, 0); + } + } + } + + /** + * Modify cluster config and see if detector detects. + */ + @Test(dependsOnMethods = "testRemoveInstance") + public void testModifyClusterConfig() { + // Modify cluster config + ClusterConfig clusterConfig = _dataAccessor.getProperty(_keyBuilder.clusterConfig()); + clusterConfig.setTopology("Change"); + _dataAccessor.updateProperty(_keyBuilder.clusterConfig(), clusterConfig); + + _dataProvider.notifyDataChange(ChangeType.CLUSTER_CONFIG); + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + checkChangeTypes(ChangeType.CLUSTER_CONFIG); + // Check the counts for other types + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + checkDetectionCounts(type, 0, 0, 0); + } + } + + /** + * Test that change detector gives correct results when there are no changes after updating + * snapshots. + */ + @Test(dependsOnMethods = "testModifyClusterConfig") + public void testNoChange() { + // Test twice to make sure that no change is stable across different runs + for (int i = 0; i < 2; i++) { + _dataProvider.refresh(_dataAccessor); + _resourceChangeDetector.updateSnapshots(_dataProvider); + + Assert.assertEquals(_resourceChangeDetector.getChangeTypes().size(), 0); + // Check the counts for all the other types + for (ChangeType type : RESOURCE_CHANGE_TYPES) { + checkDetectionCounts(type, 0, 0, 0); + } + } + } + + /** + * Check that the given change types appear in detector's change types. + * @param types + */ + private void checkChangeTypes(ChangeType... types) { + for (ChangeType type : types) { + Assert.assertTrue(_resourceChangeDetector.getChangeTypes().contains(type)); + } + } + + /** + * Convenience method for checking three types of detections. + * @param changeType + * @param additions + * @param changes + * @param deletions + */ + private void checkDetectionCounts(ChangeType changeType, int additions, int changes, + int deletions) { + Assert.assertEquals(_resourceChangeDetector.getAdditionsByType(changeType).size(), additions); + Assert.assertEquals(_resourceChangeDetector.getChangesByType(changeType).size(), changes); + Assert.assertEquals(_resourceChangeDetector.getRemovalsByType(changeType).size(), deletions); + } +}
