This is an automated email from the ASF dual-hosted git repository.
jiajunwang pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new a153518 Fix TaskStateModelFactory no msds endpoint multizk situation
(#1644)
a153518 is described below
commit a1535189f25d895d53badbc862a9bbb96a8deace
Author: Neal Sun <[email protected]>
AuthorDate: Fri Feb 12 11:36:04 2021 -0800
Fix TaskStateModelFactory no msds endpoint multizk situation (#1644)
Fix TaskStateModelFactory creation when msds endpoint is not included in
system property. This bug was introduced because when the feature was
introduced to allow no msds endpoint in system property, the code in this PR
was still in a feature branch and didn't get updated properly.
Co-authored-by: Neal Sun <[email protected]>
---
.../apache/helix/manager/zk/ZKHelixManager.java | 7 ++++
.../apache/helix/task/TaskStateModelFactory.java | 30 +++++++++++----
.../helix/task/TestTaskStateModelFactory.java | 44 +++++++++++++++++++---
3 files changed, 67 insertions(+), 14 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
index 04f4385..890547b 100644
--- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
+++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java
@@ -692,6 +692,13 @@ public class ZKHelixManager implements HelixManager,
IZkStateListener {
return _zkAddress;
}
+ /**
+ * @return the RealmAwareZkConnectionConfig used to create a realm aware
ZkClient
+ */
+ public RealmAwareZkClient.RealmAwareZkConnectionConfig
getRealmAwareZkConnectionConfig() {
+ return _realmAwareZkConnectionConfig;
+ }
+
@Override
public String getInstanceName() {
return _instanceName;
diff --git
a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
index 292c26a..aded724 100644
--- a/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
+++ b/helix-core/src/main/java/org/apache/helix/task/TaskStateModelFactory.java
@@ -19,7 +19,6 @@ package org.apache.helix.task;
* under the License.
*/
-import java.io.IOException;
import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
@@ -32,6 +31,7 @@ import com.google.common.annotations.VisibleForTesting;
import org.apache.helix.HelixException;
import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
+import org.apache.helix.manager.zk.ZKHelixManager;
import org.apache.helix.monitoring.mbeans.ThreadPoolExecutorMonitor;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.participant.statemachine.StateModelFactory;
@@ -140,19 +140,33 @@ public class TaskStateModelFactory extends
StateModelFactory<TaskStateModel> {
// and some connect the manager before registering the state model factory
(in which case we
// can use manager's connection). We need to think about the right order
and determine if we
// want to enforce it, which may cause backward incompatibility.
+ if (!(manager instanceof ZKHelixManager)) {
+ // TODO: None-ZKHelixManager cannot initialize this class. After
interface rework of
+ // HelixManager, the initialization should be allowed.
+ throw new UnsupportedOperationException(
+ "Only ZKHelixManager is supported for configurable thread pool.");
+ }
RealmAwareZkClient.RealmAwareZkClientConfig clientConfig =
new RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(new
ZNRecordSerializer());
String zkAddress = manager.getMetadataStoreConnectionString();
if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || zkAddress
== null) {
- String clusterName = manager.getClusterName();
- String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
- RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
- new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
- .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
- .setZkRealmShardingKey(shardingKey).build();
+ RealmAwareZkClient.RealmAwareZkConnectionConfig zkConnectionConfig =
+ ((ZKHelixManager) manager).getRealmAwareZkConnectionConfig();
+ // TODO: a fallback logic is created because it's possible for the
ZKHelixManager to not
+ // have a connection config, since a connection config may be created
during
+ // ZKHelixManager.connect(). This is the same problem as described
earlier because connect()
+ // may happen before or after TaskStateModelFactory initialization.
Clean this up after that
+ // problem is fixed.
+ if (zkConnectionConfig == null) {
+ String clusterName = manager.getClusterName();
+ String shardingKey = HelixUtil.clusterNameToShardingKey(clusterName);
+ zkConnectionConfig = new
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.SINGLE_REALM)
+ .setZkRealmShardingKey(shardingKey).build();
+ }
try {
- return new FederatedZkClient(connectionConfig, clientConfig);
+ return new FederatedZkClient(zkConnectionConfig, clientConfig);
} catch (InvalidRoutingDataException | IllegalArgumentException e) {
throw new HelixException("Failed to create FederatedZkClient!", e);
}
diff --git
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
index dc56792..bb64989 100644
---
a/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
+++
b/helix-core/src/test/java/org/apache/helix/task/TestTaskStateModelFactory.java
@@ -24,14 +24,16 @@ import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
-import org.apache.helix.HelixManager;
import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.integration.manager.MockParticipantManager;
import org.apache.helix.integration.task.TaskTestBase;
+import org.apache.helix.manager.zk.ZKHelixManager;
+import org.apache.helix.mock.MockManager;
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.msdcommon.constant.MetadataStoreRoutingConstants;
import org.apache.helix.msdcommon.mock.MockMetadataStoreDirectoryServer;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
+import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.mockito.Mockito;
@@ -47,7 +49,7 @@ public class TestTaskStateModelFactory extends TaskTestBase {
TaskConstants.DEFAULT_TASK_THREAD_POOL_SIZE + 1;
@Test
- public void testConfigAccessorCreationMultiZk() throws Exception {
+ public void testZkClientCreationMultiZk() throws Exception {
MockParticipantManager anyParticipantManager = _participants[0];
InstanceConfig instanceConfig =
@@ -82,8 +84,10 @@ public class TestTaskStateModelFactory extends TaskTestBase {
// Turn on multiZk mode in System config
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "true");
// MSDS endpoint:
http://localhost:11117/admin/v2/namespaces/testTaskStateModelFactory
+ String testMSDSServerEndpointKey =
+ "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" +
msdsNamespace;
System.setProperty(MetadataStoreRoutingConstants.MSDS_SERVER_ENDPOINT_KEY,
- "http://" + msdsHostName + ":" + msdsPort + "/admin/v2/namespaces/" +
msdsNamespace);
+ testMSDSServerEndpointKey);
RoutingDataManager.getInstance().reset();
RealmAwareZkClient zkClient =
TaskStateModelFactory.createZkClient(anyParticipantManager);
@@ -94,7 +98,7 @@ public class TestTaskStateModelFactory extends TaskTestBase {
// Turn off multiZk mode in System config, and remove zkAddress
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED, "false");
- HelixManager participantManager = Mockito.spy(anyParticipantManager);
+ ZKHelixManager participantManager = Mockito.spy(anyParticipantManager);
when(participantManager.getMetadataStoreConnectionString()).thenReturn(null);
zkClient = TaskStateModelFactory.createZkClient(participantManager);
Assert.assertEquals(TaskUtil
@@ -102,6 +106,28 @@ public class TestTaskStateModelFactory extends
TaskTestBase {
anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
Assert.assertTrue(zkClient instanceof FederatedZkClient);
+ // Test no connection config case
+
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(null);
+ zkClient = TaskStateModelFactory.createZkClient(participantManager);
+ Assert.assertEquals(TaskUtil
+ .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ Assert.assertTrue(zkClient instanceof FederatedZkClient);
+
+ // Remove server endpoint key and use connection config to specify endpoint
+ System.clearProperty(SystemPropertyKeys.MSDS_SERVER_ENDPOINT_KEY);
+ RealmAwareZkClient.RealmAwareZkConnectionConfig connectionConfig =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder()
+ .setRealmMode(RealmAwareZkClient.RealmMode.MULTI_REALM)
+ .setRoutingDataSourceEndpoint(testMSDSServerEndpointKey)
+
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name()).build();
+
when(participantManager.getRealmAwareZkConnectionConfig()).thenReturn(connectionConfig);
+ zkClient = TaskStateModelFactory.createZkClient(participantManager);
+ Assert.assertEquals(TaskUtil
+ .getTargetThreadPoolSize(zkClient,
anyParticipantManager.getClusterName(),
+ anyParticipantManager.getInstanceName()),
TEST_TARGET_TASK_THREAD_POOL_SIZE);
+ Assert.assertTrue(zkClient instanceof FederatedZkClient);
+
// Restore system properties
if (prevMultiZkEnabled == null) {
System.clearProperty(SystemPropertyKeys.MULTI_ZK_ENABLED);
@@ -116,8 +142,8 @@ public class TestTaskStateModelFactory extends TaskTestBase
{
msds.stopServer();
}
- @Test(dependsOnMethods = "testConfigAccessorCreationMultiZk")
- public void testConfigAccessorCreationSingleZk() {
+ @Test(dependsOnMethods = "testZkClientCreationMultiZk")
+ public void testZkClientCreationSingleZk() {
MockParticipantManager anyParticipantManager = _participants[0];
// Save previously-set system configs
@@ -137,4 +163,10 @@ public class TestTaskStateModelFactory extends
TaskTestBase {
System.setProperty(SystemPropertyKeys.MULTI_ZK_ENABLED,
prevMultiZkEnabled);
}
}
+
+ @Test(dependsOnMethods = "testZkClientCreationSingleZk",
+ expectedExceptions = UnsupportedOperationException.class)
+ public void testZkClientCreationNonZKManager() {
+ TaskStateModelFactory.createZkClient(new MockManager());
+ }
}