This is an automated email from the ASF dual-hosted git repository. hulee pushed a commit to branch master in repository https://gitbox.apache.org/repos/asf/helix.git
commit b7f6f3c461a136d4e4d6ca9f8ffa48939b450727 Author: Hunter Lee <[email protected]> AuthorDate: Thu Mar 26 17:54:58 2020 -0700 Make multiZkEnabled configurable in HelixRestNamespace (#915) It was observed that we need more fine-grained control over this multiZkEnabled config because there could exists namespaces with differing modes. Because multiple namespaces may be co-deployed, we cannot simply make it a system config. --- .../org/apache/helix/manager/zk/ZKHelixAdmin.java | 6 ++- .../helix/rest/common/HelixRestNamespace.java | 18 ++++++++- .../apache/helix/rest/server/HelixRestMain.java | 4 +- .../apache/helix/rest/server/HelixRestServer.java | 3 +- .../apache/helix/rest/server/ServerContext.java | 43 ++++++++++++++-------- 5 files changed, 52 insertions(+), 22 deletions(-) diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 4f5b67e..5ded96d 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -936,8 +936,10 @@ public class ZKHelixAdmin implements HelixAdmin { public List<String> getClusters() { if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) || _zkClient instanceof FederatedZkClient) { - throw new UnsupportedOperationException( - "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!"); + String errMsg = + "getClusters() is not supported in multi-realm mode! Use Metadata Store Directory Service instead!"; + LOG.error(errMsg); + throw new UnsupportedOperationException(errMsg); } List<String> zkToplevelPathes = _zkClient.getChildren("/"); diff --git a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java index 0632f36..29b1561 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/common/HelixRestNamespace.java @@ -35,6 +35,7 @@ public class HelixRestNamespace { METADATA_STORE_TYPE, METADATA_STORE_ADDRESS, IS_DEFAULT, + MULTI_ZK_ENABLED, MSDS_ENDPOINT } @@ -67,6 +68,11 @@ public class HelixRestNamespace { private boolean _isDefault; /** + * Flag indicating whether this namespace should have multi-zk feature enabled. + */ + private boolean _isMultiZkEnabled; + + /** * Endpoint for accessing MSDS for this namespace. */ private String _msdsEndpoint; @@ -77,15 +83,17 @@ public class HelixRestNamespace { public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, String metadataStoreAddress, boolean isDefault) throws IllegalArgumentException { - this(name, metadataStoreType, metadataStoreAddress, isDefault, null); + this(name, metadataStoreType, metadataStoreAddress, isDefault, false, null); } public HelixRestNamespace(String name, HelixMetadataStoreType metadataStoreType, - String metadataStoreAddress, boolean isDefault, String msdsEndpoint) { + String metadataStoreAddress, boolean isDefault, boolean isMultiZkEnabled, + String msdsEndpoint) { _name = name; _metadataStoreAddress = metadataStoreAddress; _metadataStoreType = metadataStoreType; _isDefault = isDefault; + _isMultiZkEnabled = isMultiZkEnabled; _msdsEndpoint = msdsEndpoint; validate(); } @@ -119,9 +127,15 @@ public class HelixRestNamespace { Map<String, String> ret = new HashMap<>(); ret.put(HelixRestNamespaceProperty.NAME.name(), _name); ret.put(HelixRestNamespaceProperty.IS_DEFAULT.name(), String.valueOf(_isDefault)); + ret.put(HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name(), String.valueOf(_isMultiZkEnabled)); + ret.put(HelixRestNamespaceProperty.MSDS_ENDPOINT.name(), String.valueOf(_msdsEndpoint)); return ret; } + public boolean isMultiZkEnabled() { + return _isMultiZkEnabled; + } + public String getMsdsEndpoint() { return _msdsEndpoint; } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java index 49940c3..ffc8aed 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestMain.java @@ -151,7 +151,9 @@ public class HelixRestMain { HelixRestNamespace.HelixMetadataStoreType.valueOf( config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_TYPE.name())), config.get(HelixRestNamespace.HelixRestNamespaceProperty.METADATA_STORE_ADDRESS.name()), - false, config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name()))); + false, Boolean.parseBoolean( + config.get(HelixRestNamespace.HelixRestNamespaceProperty.MULTI_ZK_ENABLED.name())), + config.get(HelixRestNamespace.HelixRestNamespaceProperty.MSDS_ENDPOINT.name()))); } } diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java index e6b5b34..cb199cb 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/HelixRestServer.java @@ -149,7 +149,8 @@ public class HelixRestServer { // Enable the default statistical monitoring MBean for Jersey server cfg.property(ServerProperties.MONITORING_STATISTICS_MBEANS_ENABLED, true); cfg.property(ContextPropertyKeys.SERVER_CONTEXT.name(), - new ServerContext(namespace.getMetadataStoreAddress(), namespace.getMsdsEndpoint())); + new ServerContext(namespace.getMetadataStoreAddress(), namespace.isMultiZkEnabled(), + namespace.getMsdsEndpoint())); if (type == ServletType.DEFAULT_SERVLET) { cfg.property(ContextPropertyKeys.ALL_NAMESPACES.name(), _helixNamespaces); } else { diff --git a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java index 52f1738..c6632c2 100644 --- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java +++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java @@ -61,6 +61,7 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat private static final Logger LOG = LoggerFactory.getLogger(ServerContext.class); private final String _zkAddr; + private boolean _isMultiZkEnabled; private final String _msdsEndpoint; private volatile RealmAwareZkClient _zkClient; @@ -83,16 +84,18 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat private RealmAwareZkClient _zkClientForListener; public ServerContext(String zkAddr) { - this(zkAddr, null); + this(zkAddr, false, null); } /** * Initializes a ServerContext for this namespace. * @param zkAddr routing ZK address (on multi-zk mode) + * @param isMultiZkEnabled boolean flag for whether multi-zk mode is enabled * @param msdsEndpoint if given, this server context will try to read routing data from this MSDS. */ - public ServerContext(String zkAddr, String msdsEndpoint) { + public ServerContext(String zkAddr, boolean isMultiZkEnabled, String msdsEndpoint) { _zkAddr = zkAddr; + _isMultiZkEnabled = isMultiZkEnabled; _msdsEndpoint = msdsEndpoint; // only applicable on multi-zk mode // We should NOT initiate _zkClient and anything that depends on _zkClient in @@ -111,20 +114,9 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat synchronized (this) { if (_zkClient == null) { // If the multi ZK config is enabled, use FederatedZkClient on multi-realm mode - if (Boolean.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) { - LOG.info("ServerContext: initializing FederatedZkClient with routing ZK at {}!", - _zkAddr); + if (_isMultiZkEnabled || Boolean + .parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) { try { - RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = - new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); - // If MSDS endpoint is set for this namespace, use that instead. - if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) { - connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint); - } - _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), - new RealmAwareZkClient.RealmAwareZkClientConfig() - .setZkSerializer(new ZNRecordSerializer())); - // Make sure the ServerContext is subscribed to routing data change so that it knows // when to reset ZkClient and Helix APIs if (_zkClientForListener == null) { @@ -136,7 +128,20 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat // Refresh data subscription _zkClientForListener.unsubscribeAll(); _zkClientForListener.subscribeRoutingDataChanges(this, this); + LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!", _zkAddr); + + RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder connectionConfigBuilder = + new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder(); + // If MSDS endpoint is set for this namespace, use that instead. + if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) { + connectionConfigBuilder.setMsdsEndpoint(_msdsEndpoint); + } + _zkClient = new FederatedZkClient(connectionConfigBuilder.build(), + new RealmAwareZkClient.RealmAwareZkClientConfig() + .setZkSerializer(new ZNRecordSerializer())); + LOG.info("ServerContext: FederatedZkClient created successfully!"); } catch (IOException | InvalidRoutingDataException | IllegalStateException e) { + LOG.error("Failed to create FederatedZkClient!", e); throw new HelixException("Failed to create FederatedZkClient!", e); } } else { @@ -279,7 +284,13 @@ public class ServerContext implements IZkDataListener, IZkChildListener, IZkStat @Override public void handleDataDeleted(String dataPath) { - // NOP because this is covered by handleChildChange() + if (_zkClientForListener == null || _zkClientForListener.isClosed()) { + return; + } + // Resubscribe + _zkClientForListener.unsubscribeAll(); + _zkClientForListener.subscribeRoutingDataChanges(this, this); + resetZkResources(); } @Override
