This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new 6d9ff4b  Improve the GenericHelixController global tracking record to 
support multiple controller objects for the same cluster in one JVM. (#1676)
6d9ff4b is described below

commit 6d9ff4b1b9d7c405ef1ab10d0b31185e58df0e8f
Author: Jiajun Wang <[email protected]>
AuthorDate: Fri Mar 26 11:50:54 2021 -0700

    Improve the GenericHelixController global tracking record to support 
multiple controller objects for the same cluster in one JVM. (#1676)
    
    This PR refines the GenericHelixController global tracking record so it 
acts as expected even we have more than one instance in the JVM. This impacts 
test logic only in general.
---
 .../helix/controller/GenericHelixController.java   | 60 +++++++++++++++++++---
 .../java/org/apache/helix/util/RebalanceUtil.java  |  7 +--
 .../org/apache/helix/tools/TestHelixAdminCli.java  |  5 --
 .../helix/rest/server/TestClusterAccessor.java     |  7 ---
 4 files changed, 56 insertions(+), 23 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
index 8ba9913..951143d 100644
--- 
a/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
+++ 
b/helix-core/src/main/java/org/apache/helix/controller/GenericHelixController.java
@@ -38,7 +38,9 @@ import java.util.concurrent.ScheduledFuture;
 import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.Collectors;
 
+import com.google.common.collect.ImmutableSet;
 import com.google.common.collect.Sets;
 import org.apache.helix.HelixDataAccessor;
 import org.apache.helix.HelixManager;
@@ -204,7 +206,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
    */
   private long _lastPipelineEndTimestamp;
 
-  private String _clusterName;
+  private final String _clusterName;
   private final Set<Pipeline.Type> _enabledPipelineTypes;
 
   private HelixManager _helixManager;
@@ -225,10 +227,52 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
    *  1) ZK callback should go to ClusterDataCache and trigger data cache 
refresh only
    *  2) then ClusterDataCache.refresh triggers rebalance pipeline.
    */
-  /* Map of cluster->GenrichelixController */
-  private static Map<String, GenericHelixController> HelixControllerFactory = 
new ConcurrentHashMap<>();
-  public static GenericHelixController getController(String clusterName) {
-    return HelixControllerFactory.get(clusterName);
+  /* Map of cluster->Set of GenericHelixController*/
+  private static Map<String, ImmutableSet<GenericHelixController>> 
_helixControllerFactory =
+      new ConcurrentHashMap<>();
+
+  public static GenericHelixController getLeaderController(String clusterName) 
{
+    if (clusterName != null) {
+      ImmutableSet<GenericHelixController> controllers = 
_helixControllerFactory.get(clusterName);
+      if (controllers != null) {
+        return controllers.stream().filter(controller -> 
controller._helixManager.isLeader())
+            .findAny().orElse(null);
+      }
+    }
+    return null;
+  }
+
+  public static void addController(GenericHelixController controller) {
+    if (controller != null && controller._clusterName != null) {
+      synchronized (_helixControllerFactory) {
+        Set<GenericHelixController> currentControllerSet =
+            _helixControllerFactory.get(controller._clusterName);
+        if (currentControllerSet == null) {
+          _helixControllerFactory.put(controller._clusterName, 
ImmutableSet.of(controller));
+        } else {
+          ImmutableSet.Builder<GenericHelixController> builder = 
ImmutableSet.builder();
+          builder.addAll(currentControllerSet);
+          builder.add(controller);
+          _helixControllerFactory.put(controller._clusterName, 
builder.build());
+        }
+      }
+    }
+  }
+
+  public static void removeController(GenericHelixController controller) {
+    if (controller != null && controller._clusterName != null) {
+      synchronized (_helixControllerFactory) {
+        Set<GenericHelixController> currentControllerSet =
+            _helixControllerFactory.get(controller._clusterName);
+        if (currentControllerSet != null && 
currentControllerSet.contains(controller)) {
+          ImmutableSet.Builder<GenericHelixController> builder = 
ImmutableSet.builder();
+          builder.addAll(
+              currentControllerSet.stream().filter(c -> c.hashCode() != 
controller.hashCode())
+                  .collect(Collectors.toSet()));
+          _helixControllerFactory.put(controller._clusterName, 
builder.build());
+        }
+      }
+    }
   }
 
   /**
@@ -654,9 +698,7 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
       _taskEventThread = null;
     }
 
-    if (clusterName != null) {
-      HelixControllerFactory.put(clusterName, this);
-    }
+    addController(this);
   }
 
   private void initializeAsyncFIFOWorkers() {
@@ -1346,6 +1388,8 @@ public class GenericHelixController implements 
IdealStateChangeListener, LiveIns
   }
 
   public void shutdown() throws InterruptedException {
+    removeController(this);
+
     stopPeriodRebalance();
     _periodicalRebalanceExecutor.shutdown();
     if (!_periodicalRebalanceExecutor
diff --git a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java 
b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
index 145b013..91ec406 100644
--- a/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
+++ b/helix-core/src/main/java/org/apache/helix/util/RebalanceUtil.java
@@ -159,9 +159,10 @@ public class RebalanceUtil {
       LOG.error("Failed to issue a pipeline run. Delay is invalid.");
       return;
     }
-    GenericHelixController controller = 
GenericHelixController.getController(clusterName);
-    if (controller != null) {
-      controller.scheduleOnDemandRebalance(delay, shouldRefreshCache);
+    GenericHelixController leaderController =
+        GenericHelixController.getLeaderController(clusterName);
+    if (leaderController != null) {
+      leaderController.scheduleOnDemandRebalance(delay, shouldRefreshCache);
     } else {
       LOG.error("Failed to issue a pipeline. Controller for cluster {} does 
not exist.",
           clusterName);
diff --git 
a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java 
b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
index ae92437..0c27299 100644
--- a/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
+++ b/helix-core/src/test/java/org/apache/helix/tools/TestHelixAdminCli.java
@@ -724,11 +724,6 @@ public class TestHelixAdminCli extends ZkTestBase {
     HelixAdmin admin = _gSetupTool.getClusterManagementTool();
     TestHelper.verify(() -> {
       if (admin.getResourceExternalView(grandClusterName, clusterName) == 
null) {
-        // TODO: Remove the following logic once 
https://github.com/apache/helix/issues/1617 is fixed.
-        // TODO: For now, we may need to touch the IdealState to trigger a new 
rebalance since the test
-        // TODO: is running multiple GenericHelixController instances in one 
JVM.
-        IdealState is = admin.getResourceIdealState(grandClusterName, 
clusterName);
-        admin.setResourceIdealState(grandClusterName, clusterName, is);
         return false;
       }
       return true;
diff --git 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
index 2e1561f..b4ec2a0 100644
--- 
a/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
+++ 
b/helix-rest/src/test/java/org/apache/helix/rest/server/TestClusterAccessor.java
@@ -671,13 +671,6 @@ public class TestClusterAccessor extends AbstractTestClass 
{
 
     boolean result = TestHelper.verify(() -> {
       LiveInstance leader = 
normalAccessor.getProperty(normKeyBuilder.controllerLeader());
-      if (leader == null) {
-        // TODO: Remove the following logic once 
https://github.com/apache/helix/issues/1617 is fixed.
-        // TODO: For now, we may need to touch the IdealState to trigger a new 
rebalance since the test
-        // TODO: is running multiple GenericHelixController instances in one 
JVM.
-        IdealState is = 
normalAccessor.getProperty(keyBuilder.idealStates(ACTIVATE_NORM_CLUSTER));
-        
normalAccessor.setProperty(keyBuilder.idealStates(ACTIVATE_NORM_CLUSTER), is);
-      }
       return leader != null;
     }, 12000);
     Assert.assertTrue(result);

Reply via email to