This is an automated email from the ASF dual-hosted git repository.

jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git


The following commit(s) were added to refs/heads/master by this push:
     new a153518  Fix TaskStateModelFactory no msds endpoint multizk situation 
(#1644)
a153518 is described below

commit a1535189f25d895d53badbc862a9bbb96a8deace
Author: Neal Sun <[email protected]>
AuthorDate: Fri Feb 12 11:36:04 2021 -0800

    Fix TaskStateModelFactory no msds endpoint multizk situation (#1644)
    
    Fix TaskStateModelFactory creation when msds endpoint is not included in 
system property. This bug was introduced because when the feature was 
introduced to allow no msds endpoint in system property, the code in this PR 
was still in a feature branch and didn't get updated properly.
    
    Co-authored-by: Neal Sun <[email protected]>
---
 .../apache/helix/manager/zk/ZKHelixManager.java    |  7 ++++
 .../apache/helix/task/TaskStateModelFactory.java   | 30 +++++++++++----
 .../helix/task/TestTaskStateModelFactory.java      | 44 +++++++++++++++++++---
 3 files changed, 67 insertions(+), 14 deletions(-)

diff --git 
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java 
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 04f4385..890547b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -692,6 +692,13 @@ public class ZKHelixManager implements HelixManager, 
IZkStateListener {
     return _zkAddress;
   }
 
+  /**
+   * @return the RealmAwareZkConnectionConfig used to create a realm aware 
ZkClient
+   */
+  public RealmAwareZkClient.RealmAwareZkConnectionConfig 
getRealmAwareZkConnectionConfig() {
+    return _realmAwareZkConnectionConfig;
+  }
+
   @Override
   public String getInstanceName() {
     return _instanceName;
diff --git 
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java 
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 292c26a..aded724 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,7 +19,6 @@ package org.apache.helix.task;
  * under the License.
  */
 
-import java.io.IOException;
 import java.util.Map;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
@@ -32,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
 import org.apache.helix.HelixException;
 import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.manager.zk.ZKHelixManager;
 import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
 import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
 import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -140,19 +140,33 @@ public class TaskStateModelFactory extends 
StateModelFactory<TaskStateModel> {
     // and some connect the manager before registering the state model factory 
(in which case we
     // can use manager's connection). We need to think about the right order 
and determine if we
     // want to enforce it, which may cause backward incompatibility.
+    if (!(manager instanceof ZKHelixManager)) {
+      // TODO: None-ZKHelixManager cannot initialize this class. After 
interface rework of
+      // HelixManager, the initialization should be allowed.
+      throw new UnsupportedOperationException(
+          "Only ZKHelixManager is supported for configurable thread pool.");
+    }
     RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
         new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new 
ZNRecordSerializer());
     String zkAddress = manager.getMetadataStoreConnectionString();
 
     if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress 
== null) {
-      String clusterName = manager.getClusterName();
-      String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
-      RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
-          new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
-              .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
-              .setZkRealmShardingKey(shardingKey).build();
+      RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig =
+          ((ZKHelixManager) manager).getRealmAwareZkConnectionConfig();
+      // TODO: a fallback logic is created because it's possible for the 
ZKHelixManager to not
+      // have a connection config, since a connection config may be created 
during
+      // ZKHelixManager.connect(). This is the same problem as described 
earlier because connect()
+      // may happen before or after TaskStateModelFactory initialization. 
Clean this up after that
+      // problem is fixed.
+      if (zkConnectionConfig == null) {
+        String clusterName = manager.getClusterName();
+        String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+        zkConnectionConfig = new 
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+            .setZkRealmShardingKey(shardingKey).build();
+      }
       try {
-        return new FederatedZkClient(connectionConfig, clientConfig);
+        return new FederatedZkClient(zkConnectionConfig, clientConfig);
       } catch (InvalidRoutingDataException | IllegalArgumentException e) {
         throw new HelixException("Failed to create FederatedZkClient!", e);
       }
diff --git 
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
index dc56792..bb64989 100644
--- 
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
+++ 
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -24,14 +24,16 @@ import java.util.Collections;
 import java.util.HashMap;
 import java.util.Map;
 
-import org.apache.helix.HelixManager;
 import org.apache.helix.SystemPropertyKeys;
 import org.apache.helix.integration.manager.MockParticipantManager;
 import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.MockManager;
 import org.apache.helix.model.InstanceConfig;
 import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
 import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
 import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
 import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
 import org.apache.helix.zookeeper.routing.RoutingDataManager;
 import org.mockito.Mockito;
@@ -47,7 +49,7 @@ public class TestTaskStateModelFactory extends TaskTestBase {
       TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
 
   @Test
-  public void testConfigAccessorCreationMultiZk() throws Exception {
+  public void testZkClientCreationMultiZk() throws Exception {
     MockParticipantManager anyParticipantManager = _participants[0];
 
     InstanceConfig instanceConfig =
@@ -82,8 +84,10 @@ public class TestTaskStateModelFactory extends TaskTestBase {
     // Turn on multiZk mode in System config
     System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
     // MSDS endpoint: 
http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+    String testMSDSServerEndpointKey =
+        "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace;
     System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
-        "http://"; + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" + 
msdsNamespace);
+        testMSDSServerEndpointKey);
 
     RoutingDataManager.getInstance().reset();
     RealmAwareZkClient zkClient = 
TaskStateModelFactory.createZkClient(anyParticipantManager);
@@ -94,7 +98,7 @@ public class TestTaskStateModelFactory extends TaskTestBase {
 
     // Turn off multiZk mode in System config, and remove zkAddress
     System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
-    HelixManager participantManager = Mockito.spy(anyParticipantManager);
+    ZKHelixManager participantManager = Mockito.spy(anyParticipantManager);
     
when(participantManager.getMetadataStoreConnectionString()).thenReturn(null);
     zkClient = TaskStateModelFactory.createZkClient(participantManager);
     Assert.assertEquals(TaskUtil
@@ -102,6 +106,28 @@ public class TestTaskStateModelFactory extends 
TaskTestBase {
             anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
     Assert.assertTrue(zkClient instanceof FederatedZkClient);
 
+    // Test no connection config case
+    
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(null);
+    zkClient = TaskStateModelFactory.createZkClient(participantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, 
anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    Assert.assertTrue(zkClient instanceof FederatedZkClient);
+
+    // Remove server endpoint key and use connection config to specify endpoint
+    System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+    RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+        new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+            .setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+            .setRoutingDataSourceEndpoint(testMSDSServerEndpointKey)
+            
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()).build();
+    
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(connectionConfig);
+    zkClient = TaskStateModelFactory.createZkClient(participantManager);
+    Assert.assertEquals(TaskUtil
+        .getTargetThreadPoolSize(zkClient, 
anyParticipantManager.getClusterName(),
+            anyParticipantManager.getInstanceName()), 
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+    Assert.assertTrue(zkClient instanceof FederatedZkClient);
+
     // Restore system properties
     if (prevMultiZkEnabled == null) {
       System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
@@ -116,8 +142,8 @@ public class TestTaskStateModelFactory extends TaskTestBase 
{
     msds.stopServer();
   }
 
-  @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
-  public void testConfigAccessorCreationSingleZk() {
+  @Test(dependsOnMethods = "testZkClientCreationMultiZk")
+  public void testZkClientCreationSingleZk() {
     MockParticipantManager anyParticipantManager = _participants[0];
 
     // Save previously-set system configs
@@ -137,4 +163,10 @@ public class TestTaskStateModelFactory extends 
TaskTestBase {
       System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, 
prevMultiZkEnabled);
     }
   }
+
+  @Test(dependsOnMethods = "testZkClientCreationSingleZk",
+      expectedExceptions = UnsupportedOperationException.class)
+  public void testZkClientCreationNonZKManager() {
+    TaskStateModelFactory.createZkClient(new MockManager());
+  }
 }

Reply via email to