This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new 6d9ff4b Improve the GenericHelixController global tracking record to
support multiple controller objects for the same cluster in one JVM. (#1676)
6d9ff4b is described below
commit 6d9ff4b1b9d7c405ef1ab10d0b31185e58df0e8f
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Mar 26 11:50:54 2021 -0700
Improve the GenericHelixController global tracking record to support
multiple controller objects for the same cluster in one JVM. (#1676)
This PR refines the GenericHelixController global tracking record so it
acts as expected even we have more than one instance in the JVM. This impacts
test logic only in general.
---
.../helix/controller/GenericHelixController.java | 60 +++++++++++++++++++---
.../java/org/apache/helix/util/RebalanceUtil.java | 7 +--
.../org/apache/helix/tools/TestHelixAdminCli.java | 5 --
.../helix/rest/server/TestClusterAccessor.java | 7 ---
4 files changed, 56 insertions(+), 23 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8ba9913..951143d 100644
---
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -38,7 +38,9 @@ import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
+import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
@@ -204,7 +206,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
*/
private long _lastPipelineEndTimestamp;
- private String _clusterName;
+ private final String _clusterName;
private final Set<Pipeline.Type> _enabledPipelineTypes;
private HelixManager _helixManager;
@@ -225,10 +227,52 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
* 1) ZK callback should go to ClusterDataCache and trigger data cache
refresh only
* 2) then ClusterDataCache.refresh triggers rebalance pipeline.
*/
- /* Map of cluster->GenrichelixController */
- private static Map<String, GenericHelixController> HelixControllerFactory =
new ConcurrentHashMap<>();
- public static GenericHelixController getController(String clusterName) {
- return HelixControllerFactory.get(clusterName);
+ /* Map of cluster->Set of GenericHelixController*/
+ private static Map<String, ImmutableSet<GenericHelixController>>
_helixControllerFactory =
+ new ConcurrentHashMap<>();
+
+ public static GenericHelixController getLeaderController(String clusterName)
{
+ if (clusterName != null) {
+ ImmutableSet<GenericHelixController> controllers =
_helixControllerFactory.get(clusterName);
+ if (controllers != null) {
+ return controllers.stream().filter(controller ->
controller._helixManager.isLeader())
+ .findAny().orElse(null);
+ }
+ }
+ return null;
+ }
+
+ public static void addController(GenericHelixController controller) {
+ if (controller != null && controller._clusterName != null) {
+ synchronized (_helixControllerFactory) {
+ Set<GenericHelixController> currentControllerSet =
+ _helixControllerFactory.get(controller._clusterName);
+ if (currentControllerSet == null) {
+ _helixControllerFactory.put(controller._clusterName,
ImmutableSet.of(controller));
+ } else {
+ ImmutableSet.Builder<GenericHelixController> builder =
ImmutableSet.builder();
+ builder.addAll(currentControllerSet);
+ builder.add(controller);
+ _helixControllerFactory.put(controller._clusterName,
builder.build());
+ }
+ }
+ }
+ }
+
+ public static void removeController(GenericHelixController controller) {
+ if (controller != null && controller._clusterName != null) {
+ synchronized (_helixControllerFactory) {
+ Set<GenericHelixController> currentControllerSet =
+ _helixControllerFactory.get(controller._clusterName);
+ if (currentControllerSet != null &&
currentControllerSet.contains(controller)) {
+ ImmutableSet.Builder<GenericHelixController> builder =
ImmutableSet.builder();
+ builder.addAll(
+ currentControllerSet.stream().filter(c -> c.hashCode() !=
controller.hashCode())
+ .collect(Collectors.toSet()));
+ _helixControllerFactory.put(controller._clusterName,
builder.build());
+ }
+ }
+ }
}
/**
@@ -654,9 +698,7 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
_taskEventThread = null;
}
- if (clusterName != null) {
- HelixControllerFactory.put(clusterName, this);
- }
+ addController(this);
}
private void initializeAsyncFIFOWorkers() {
@@ -1346,6 +1388,8 @@ public class GenericHelixController implements
IdealStateChangeListener, LiveIns
}
public void shutdown() throws InterruptedException {
+ removeController(this);
+
stopPeriodRebalance();
_periodicalRebalanceExecutor.shutdown();
if (!_periodicalRebalanceExecutor
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 145b013..91ec406 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,9 +159,10 @@ public class RebalanceUtil {
LOG.error("Failed to issue a pipeline run. Delay is invalid.");
return;
}
- GenericHelixController controller =
GenericHelixController.getController(clusterName);
- if (controller != null) {
- controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
+ GenericHelixController leaderController =
+ GenericHelixController.getLeaderController(clusterName);
+ if (leaderController != null) {
+ leaderController.scheduleOnDemandRebalance(delay, shouldRefreshCache);
} else {
LOG.error("Failed to issue a pipeline. Controller for cluster {} does
not exist.",
clusterName);
diff --git
a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index ae92437..0c27299 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -724,11 +724,6 @@ public class TestHelixAdminCli extends ZkTestBase {
HelixAdmin admin = _gSetupTool.getClusterManagementTool();
TestHelper.verify(() -> {
if (admin.getResourceExternalView(grandClusterName, clusterName) ==
null) {
- // TODO: Remove the following logic once
https://github.com/apache/helix/issues/1617 is fixed.
- // TODO: For now, we may need to touch the IdealState to trigger a new
rebalance since the test
- // TODO: is running multiple GenericHelixController instances in one
JVM.
- IdealState is = admin.getResourceIdealState(grandClusterName,
clusterName);
- admin.setResourceIdealState(grandClusterName, clusterName, is);
return false;
}
return true;
diff --git
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 2e1561f..b4ec2a0 100644
---
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -671,13 +671,6 @@ public class TestClusterAccessor extends AbstractTestClass
{
boolean result = TestHelper.verify(() -> {
LiveInstance leader =
normalAccessor.getProperty(normKeyBuilder.controllerLeader());
- if (leader == null) {
- // TODO: Remove the following logic once
https://github.com/apache/helix/issues/1617 is fixed.
- // TODO: For now, we may need to touch the IdealState to trigger a new
rebalance since the test
- // TODO: is running multiple GenericHelixController instances in one
JVM.
- IdealState is =
normalAccessor.getProperty(keyBuilder.idealStates(ACTIVATE_NORM_CLUSTER));
-
normalAccessor.setProperty(keyBuilder.idealStates(ACTIVATE_NORM_CLUSTER), is);
- }
return leader != null;
}, 12000);
Assert.assertTrue(result);