This is an automated email from the ASF dual-hosted git repository.
jxue pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 11b9fe851 Implement ViewClusterRefresher caching synchronization
(#2199)
11b9fe851 is described below
commit 11b9fe851c2711eee3810983723da197c685b427
Author: Qi (Quincy) Qu <[email protected]>
AuthorDate: Tue Aug 30 09:48:19 2022 -0700
Implement ViewClusterRefresher caching synchronization (#2199)
Implement ViewClusterRefresher caching synchronization
Implement a refresh mechanism to update view cluster local cache from
remote source of truth data in case of external data change.
---
.../helix/view/aggregator/HelixViewAggregator.java | 9 ++-
.../view/aggregator/ViewClusterRefresher.java | 85 ++++++++--------------
.../view/dataprovider/ViewClusterDataCache.java | 53 ++++++++++++++
.../view/aggregator/TestViewClusterRefresher.java | 3 +
.../dataprovider/TestViewClusterDataCache.java | 68 +++++++++++++++++
.../view/integration/TestHelixViewAggregator.java | 21 ++++++
6 files changed, 182 insertions(+), 57 deletions(-)
diff --git
a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
index b1d34b9e2..4a44579cf 100644
---
a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
+++
b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/HelixViewAggregator.java
@@ -75,7 +75,7 @@ public class HelixViewAggregator implements
ClusterConfigChangeListener {
_viewClusterManager = HelixManagerFactory
.getZKHelixManager(_viewClusterName,
generateHelixManagerInstanceName(_viewClusterName),
InstanceType.SPECTATOR, zkAddr);
- _refreshViewCluster = new AtomicBoolean(false);
+ _refreshViewCluster = new AtomicBoolean(true);
_monitor = new ViewAggregatorMonitor(viewClusterName);
_aggregator = new DedupEventProcessor<ClusterViewEvent.Type,
ClusterViewEvent>(_viewClusterName,
"Aggregator") {
@@ -191,7 +191,11 @@ public class HelixViewAggregator implements
ClusterConfigChangeListener {
_refreshViewCluster.set(true);
break;
case PeriodicViewRefresh:
- if (!_refreshViewCluster.get()) {
+ // refresh local view cluster data cache
+ boolean cacheChanged =
_viewClusterRefresher.refreshViewClusterDataCache();
+ if (cacheChanged) {
+ logger.info("Detected change in view cluster data, trigger a
refresh");
+ } else if (!_refreshViewCluster.get()) {
logger.info("Skip refresh: No event happened since last refresh, and
no force refresh.");
return;
}
@@ -199,6 +203,7 @@ public class HelixViewAggregator implements
ClusterConfigChangeListener {
// least some of the elements in view cluster
logger.info("Refreshing cluster based on event " +
event.getEventType().name());
refreshViewCluster();
+ _viewClusterRefresher.refreshViewClusterDataCache();
break;
default:
logger.error(String.format("Unrecognized event type: %s",
event.getEventType()));
diff --git
a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
index 1885a7318..c6df90876 100644
---
a/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
+++
b/helix-view-aggregator/src/main/java/org/apache/helix/view/aggregator/ViewClusterRefresher.java
@@ -34,6 +34,7 @@ import org.apache.helix.PropertyKey;
import org.apache.helix.PropertyType;
import org.apache.helix.model.ExternalView;
import org.apache.helix.view.dataprovider.SourceClusterDataProvider;
+import org.apache.helix.view.dataprovider.ViewClusterDataCache;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -44,25 +45,18 @@ import org.slf4j.LoggerFactory;
*/
public class ViewClusterRefresher {
private static final Logger logger =
LoggerFactory.getLogger(ViewClusterRefresher.class);
- private String _viewClusterName;
- private HelixDataAccessor _viewClusterDataAccessor;
+ private final String _viewClusterName;
+ private final HelixDataAccessor _viewClusterDataAccessor;
+ private final ViewClusterDataCache _viewClusterDataCache;
private Set<SourceClusterDataProvider> _dataProviderView;
- // These 3 caches stores objects that are pushed to view cluster
(write-through) cache,
- // thus we don't need to read from view cluster everytime we refresh it.
- private Map<String, HelixProperty> _viewClusterLiveInstanceCache;
- private Map<String, HelixProperty> _viewClusterInstanceConfigCache;
- private Map<String, HelixProperty> _viewClusterExternalViewCache;
-
public ViewClusterRefresher(String viewClusterName, HelixDataAccessor
viewClusterDataAccessor) {
_viewClusterName = viewClusterName;
_viewClusterDataAccessor = viewClusterDataAccessor;
- _viewClusterLiveInstanceCache = new HashMap<>();
- _viewClusterInstanceConfigCache = new HashMap<>();
- _viewClusterExternalViewCache = new HashMap<>();
+ _viewClusterDataCache = new ViewClusterDataCache(viewClusterName);
}
- private class ClusterPropertyDiff {
+ private static class ClusterPropertyDiff {
/**
* List of names of objects to set (create or modify)
*/
@@ -126,7 +120,7 @@ public class ViewClusterRefresher {
Set<String> listedNamesInSource = new HashSet<>();
Map<String, HelixProperty> sourceProperties = new HashMap<>();
Map<String, HelixProperty> viewClusterPropertyCache =
- getViewClusterPropertyCache(propertyType);
+ (Map<String, HelixProperty>) getViewClusterPropertyCache(propertyType);
if (viewClusterPropertyCache == null) {
throw new IllegalArgumentException(
"Cannot find view cluster property cache. Property: " +
propertyType.name());
@@ -170,8 +164,7 @@ public class ViewClusterRefresher {
}
// Perform refresh
- ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource,
sourceProperties,
- viewClusterPropertyCache);
+ ok = doRefresh(propertyType, listedNamesInView, listedNamesInSource,
sourceProperties, viewClusterPropertyCache);
} catch (Exception e) {
logger.warn(String
.format("Caught exception during refreshing %s for view cluster %s",
propertyType.name(),
@@ -215,12 +208,11 @@ public class ViewClusterRefresher {
* @param sourcePropertyNames names of all properties (i.e. liveInstances)
in all source clusters
* @param cachedSourceProperties all cached properties from source clusters
* @param viewClusterPropertyCache all properties that are previously set
successfully to view cluster
- * @param <T> extends HelixProperty
* @return ClusterPropertyDiff object contains diff information
*/
- private <T extends HelixProperty> ClusterPropertyDiff calculatePropertyDiff(
+ private ClusterPropertyDiff calculatePropertyDiff(
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
- Map<String, T> cachedSourceProperties, Map<String, T>
viewClusterPropertyCache) {
+ Map<String, HelixProperty> cachedSourceProperties, Map<String,
HelixProperty> viewClusterPropertyCache) {
ClusterPropertyDiff diff = new ClusterPropertyDiff();
// items whose names are in view cluster but not in source should be
removed for sure
@@ -228,7 +220,7 @@ public class ViewClusterRefresher {
toDelete.removeAll(sourcePropertyNames);
diff.addPropertiesToDelete(toDelete);
- for (Map.Entry<String, T> sourceProperty :
cachedSourceProperties.entrySet()) {
+ for (Map.Entry<String, HelixProperty> sourceProperty :
cachedSourceProperties.entrySet()) {
String name = sourceProperty.getKey();
HelixProperty property = sourceProperty.getValue();
@@ -263,17 +255,15 @@ public class ViewClusterRefresher {
* @param sourcePropertyNames all names of the target properties in source
clusters
* @param cachedSourceProperties all up-to-date cached properties in source
cluster
* @param viewClusterPropertyCache view cluster cache
- * @param <T> extends HelixProperty
* @return true if all required refreshes are successful, else false
*/
- private <T extends HelixProperty> boolean doRefresh(PropertyType
propertyType,
+ private boolean doRefresh(PropertyType propertyType,
Set<String> viewPropertyNames, Set<String> sourcePropertyNames,
- Map<String, T> cachedSourceProperties, Map<String, T>
viewClusterPropertyCache) {
+ Map<String, HelixProperty> cachedSourceProperties, Map<String,
HelixProperty> viewClusterPropertyCache) {
boolean ok = true;
// Calculate diff
ClusterPropertyDiff diff =
- calculatePropertyDiff(viewPropertyNames, sourcePropertyNames,
cachedSourceProperties,
- viewClusterPropertyCache);
+ calculatePropertyDiff(viewPropertyNames, sourcePropertyNames,
cachedSourceProperties, viewClusterPropertyCache);
// Generate property keys
List<PropertyKey> keysToSet = new ArrayList<>();
@@ -293,12 +283,12 @@ public class ViewClusterRefresher {
}
// Delete outdated properties
- if (!deleteProperties(keysToDelete, viewClusterPropertyCache)) {
+ if (!deleteProperties(keysToDelete)) {
ok = false;
}
// Add or update changed properties
- if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet(),
viewClusterPropertyCache)) {
+ if (!addOrUpdateProperties(keysToSet, diff.getPropertiesToSet())) {
ok = false;
}
return ok;
@@ -330,14 +320,22 @@ public class ViewClusterRefresher {
}
}
- private Map<String, HelixProperty> getViewClusterPropertyCache(PropertyType
propertyType) {
+ /**
+ * Refresh view cluster data cache and return true if there is data update.
+ * @return true if new change is fetched from remote
+ */
+ boolean refreshViewClusterDataCache() {
+ return _viewClusterDataCache.updateCache(_viewClusterDataAccessor);
+ }
+
+ private Map<String, ? extends HelixProperty>
getViewClusterPropertyCache(PropertyType propertyType) {
switch (propertyType) {
case INSTANCES:
- return _viewClusterInstanceConfigCache;
+ return _viewClusterDataCache.getInstanceConfigMap();
case LIVEINSTANCES:
- return _viewClusterLiveInstanceCache;
+ return _viewClusterDataCache.getLiveInstances();
case EXTERNALVIEW:
- return _viewClusterExternalViewCache;
+ return _viewClusterDataCache.getExternalViews();
default:
return null;
}
@@ -348,28 +346,21 @@ public class ViewClusterRefresher {
* for the objects that got successfully created or updated in ZK
* @param keysToAddOrUpdate
* @param objects
- * @param cache
* @param <T> HelixProperty
*
* @return true if all objects are successfully created or updated, else
false
*/
private <T extends HelixProperty> boolean addOrUpdateProperties(
- List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects,
Map<String, T> cache) {
+ List<PropertyKey> keysToAddOrUpdate, List<HelixProperty> objects) {
boolean ok = true;
logger.info(
String.format("AddOrUpdate %s objects: %s", keysToAddOrUpdate.size(),
keysToAddOrUpdate));
boolean[] addOrUpdateResults =
_viewClusterDataAccessor.setChildren(keysToAddOrUpdate, objects);
for (int i = 0; i < addOrUpdateResults.length; i++) {
if (!addOrUpdateResults[i]) {
- // Don't add item to cache yet - will retry during next refresh
logger.warn(String.format("Failed to create or update live instance
%s, will retry later",
keysToAddOrUpdate.get(i).getPath()));
ok = false;
- } else {
- // Successfully updated ViewClusterZK, proceed to update cache
- @SuppressWarnings("unchecked")
- T property = (T) objects.get(i);
- cache.put(getBaseObjectNameFromPropertyKey(keysToAddOrUpdate.get(i)),
property);
}
}
return ok;
@@ -379,38 +370,22 @@ public class ViewClusterRefresher {
* Delete properties in ZK specified by a list of property keys. Update the
given cache
* for the objects that got successfully deleted in ZK
* @param keysToDelete
- * @param cache
* @param <T> HelixProperty
* @return true if all objects got successfully deleted else false
*/
- private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey>
keysToDelete,
- Map<String, T> cache) {
+ private <T extends HelixProperty> boolean deleteProperties(List<PropertyKey>
keysToDelete) {
boolean ok = true;
logger.info(String.format("Deleting %s objects: %s", keysToDelete.size(),
keysToDelete));
for (PropertyKey key : keysToDelete) {
if (!_viewClusterDataAccessor.removeProperty(key)) {
- // Don't remove item from cache yet - will retry during next refresh
ok = false;
logger.warn(String.format("Failed to create or update live instance
%s, will retry later",
key.getPath()));
- } else {
- // Successfully updated ViewClusterZK, proceed to update cache
- cache.remove(getBaseObjectNameFromPropertyKey(key));
}
}
return ok;
}
- /**
- * Parse property key and get the id of the ZNode that property key
represents
- * @param key
- * @return
- */
- private static String getBaseObjectNameFromPropertyKey(PropertyKey key) {
- String[] params = key.getParams();
- return params[params.length - 1];
- }
-
private void logRefreshResult(PropertyType type, boolean ok) {
if (!ok) {
logger.warn(String
diff --git
a/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
new file mode 100644
index 000000000..f9b6a72a4
--- /dev/null
+++
b/helix-view-aggregator/src/main/java/org/apache/helix/view/dataprovider/ViewClusterDataCache.java
@@ -0,0 +1,53 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashMap;
+import java.util.Map;
+import org.apache.helix.HelixDataAccessor;
+import org.apache.helix.common.caches.BasicClusterDataCache;
+import org.apache.helix.model.ExternalView;
+import org.apache.helix.model.InstanceConfig;
+import org.apache.helix.model.LiveInstance;
+
+
+public class ViewClusterDataCache extends BasicClusterDataCache {
+
+ public ViewClusterDataCache(String viewClusterName) {
+ super(viewClusterName);
+ }
+
+ /**
+ * Update cache and return true if any local data is changed compared to
before refresh.
+ * @param dataAccessor the data accessor used to fetch data.
+ * @return true if there is a change to local cache.
+ */
+ public boolean updateCache(HelixDataAccessor dataAccessor) {
+ Map<String, LiveInstance> liveInstanceSnapshot = new
HashMap<>(getLiveInstances());
+ Map<String, ExternalView> externalViewSnapshot = new
HashMap<>(getExternalViews());
+ Map<String, InstanceConfig> instanceConfigSnapshot = new
HashMap<>(getInstanceConfigMap());
+ requireFullRefresh();
+ refresh(dataAccessor);
+
+ return !(liveInstanceSnapshot.equals(getLiveInstances())
+ && externalViewSnapshot.equals(getExternalViews())
+ && instanceConfigSnapshot.equals(getInstanceConfigMap()));
+ }
+}
diff --git
a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
index 2a069ee31..e642a3044 100644
---
a/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
+++
b/helix-view-aggregator/src/test/java/org/apache/helix/view/aggregator/TestViewClusterRefresher.java
@@ -103,6 +103,7 @@ public class TestViewClusterRefresher extends ZkTestBase {
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.EXTERNALVIEW));
viewClusterDataAccessor.resetCounters();
+ refresher.refreshViewClusterDataCache();
// Refresh again without change
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
@@ -147,6 +148,7 @@ public class TestViewClusterRefresher extends ZkTestBase {
new ArrayList<>(sampleProvider.getLiveInstances().values());
liveInstances.add(new LiveInstance("newLiveInstance"));
sampleProvider.setLiveInstances(liveInstances);
+ refresher.refreshViewClusterDataCache();
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.INSTANCES));
Assert.assertEquals(viewClusterDataAccessor.getSetCount(), 1);
@@ -219,6 +221,7 @@ public class TestViewClusterRefresher extends ZkTestBase {
// remove InstanceConfig and ExternalView requirement from sample provider
sampleProvider.getConfig()
.setProperties(Collections.singletonList(PropertyType.LIVEINSTANCES));
+ refresher.refreshViewClusterDataCache();
// Refresh again
Assert.assertTrue(refresher.refreshPropertiesInViewCluster(PropertyType.LIVEINSTANCES));
diff --git
a/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
new file mode 100644
index 000000000..006cd025a
--- /dev/null
+++
b/helix-view-aggregator/src/test/java/org/apache/helix/view/dataprovider/TestViewClusterDataCache.java
@@ -0,0 +1,68 @@
+package org.apache.helix.view.dataprovider;
+
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+import java.util.HashSet;
+import java.util.Set;
+import org.apache.helix.HelixConstants;
+import org.apache.helix.common.ZkTestBase;
+import org.apache.helix.integration.manager.MockParticipantManager;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
+import org.testng.Assert;
+import org.testng.annotations.BeforeClass;
+import org.testng.annotations.Test;
+
+
+public class TestViewClusterDataCache extends ZkTestBase {
+ private static final int NUM_INSTANCE = 5;
+
+ private final String CLUSTER_NAME = CLUSTER_PREFIX + "_" +
getShortClassName();
+ private final Set<String> _instances = new HashSet<>();
+
+ @BeforeClass
+ public void beforeClass() {
+ _gSetupTool.addCluster(CLUSTER_NAME, true);
+ for (int i = 0; i < NUM_INSTANCE; i++) {
+ String instanceName = String.format("%s-%s-%s", CLUSTER_NAME,
PARTICIPANT_PREFIX, i);
+ _gSetupTool.addInstanceToCluster(CLUSTER_NAME, instanceName);
+ _instances.add(instanceName);
+ MockParticipantManager participant = new MockParticipantManager(ZK_ADDR,
CLUSTER_NAME, instanceName);
+ participant.syncStart();
+ }
+ }
+
+ @Test
+ public void testCacheUpdate() {
+ ViewClusterDataCache cache = new ViewClusterDataCache(CLUSTER_NAME);
+ ZKHelixDataAccessor accessor = new ZKHelixDataAccessor(CLUSTER_NAME,
_baseAccessor);
+ Assert.assertTrue(cache.updateCache(accessor));
+ Assert.assertEquals(cache.getLiveInstances().size(), NUM_INSTANCE);
+ Assert.assertEquals(cache.getInstanceConfigMap().size(), NUM_INSTANCE);
+ for (String instance : _instances) {
+ Assert.assertTrue(cache.getLiveInstances().containsKey(instance));
+ }
+ // update again, expect no cache change
+ Assert.assertFalse(cache.updateCache(accessor));
+ // clear cache and update again
+ cache.clearCache(HelixConstants.ChangeType.LIVE_INSTANCE);
+ cache.clearCache(HelixConstants.ChangeType.INSTANCE_CONFIG);
+ Assert.assertTrue(cache.updateCache(accessor));
+ }
+}
diff --git
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
index adf16ec33..b98b6840e 100644
---
a/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
+++
b/helix-view-aggregator/src/test/java/org/apache/helix/view/integration/TestHelixViewAggregator.java
@@ -26,12 +26,14 @@ import java.util.List;
import java.util.Set;
import org.apache.helix.ConfigAccessor;
import org.apache.helix.HelixAdmin;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.NotificationContext;
import org.apache.helix.PropertyType;
import org.apache.helix.api.config.ViewClusterSourceConfig;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.manager.zk.ZKHelixAdmin;
+import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.model.ClusterConfig;
import org.apache.helix.model.Message;
import org.apache.helix.participant.statemachine.StateModelParser;
@@ -214,6 +216,25 @@ public class TestHelixViewAggregator extends
ViewAggregatorIntegrationTestBase {
triggerViewAggregatorStateTransition("LEADER", "STANDBY");
}
+ @Test(dependsOnMethods = "testHelixViewAggregator")
+ public void testRemoteDataRemovalAndRefresh() throws Exception {
+ HelixDataAccessor accessor = new ZKHelixDataAccessor(viewClusterName,
_baseAccessor);
+ // Start view aggregator
+ triggerViewAggregatorStateTransition("STANDBY", "LEADER");
+ // Wait for refresh and verify
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+ // remove live instances from view cluster zk data, wait for next refresh
trigger
+
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().liveInstances()));
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().liveInstances()).size()
> 0);
+
+
Assert.assertTrue(accessor.removeProperty(accessor.keyBuilder().externalViews()));
+ Thread.sleep((_viewClusterRefreshPeriodSec + 2) * 1000);
+
Assert.assertTrue(accessor.getChildNames(accessor.keyBuilder().externalViews()).size()
> 0);
+ // Stop view aggregator
+ triggerViewAggregatorStateTransition("LEADER", "STANDBY");
+ }
+
private void resetViewClusterConfig(int refreshPeriod, List<PropertyType>
properties) {
List<ViewClusterSourceConfig> sourceConfigs = new ArrayList<>();
for (String sourceCluster : _allSourceClusters) {