This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new acaf13f Make byte array data accessor use RealmAwareZkClient (#1480)
acaf13f is described below
commit acaf13fbfa37a3bd5a80d32cd15d90057ae342b6
Author: Hunter Lee <[email protected]>
AuthorDate: Wed Oct 21 21:08:58 2020 -0700
Make byte array data accessor use RealmAwareZkClient (#1480)
This change was left out of the ZooScalability migration of helix-rest,
making ZooKeeperAccessor endpoints fail in a multi-zk setting. This change
fixes this.
---
.../apache/helix/rest/server/ServerContext.java | 166 +++++++++++++--------
1 file changed, 106 insertions(+), 60 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 9bb3098..231284a 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -31,7 +31,6 @@ import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixException;
import org.apache.helix.InstanceType;
import org.apache.helix.SystemPropertyKeys;
-import org.apache.helix.manager.zk.ByteArraySerializer;
import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -43,6 +42,7 @@ import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ByteArraySerializer;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.client.ZkClient;
@@ -52,6 +52,7 @@ import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.apache.helix.zookeeper.zkclient.IZkChildListener;
import org.apache.helix.zookeeper.zkclient.IZkDataListener;
import org.apache.helix.zookeeper.zkclient.IZkStateListener;
+import org.apache.helix.zookeeper.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -64,6 +65,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
private boolean _isMultiZkEnabled;
private final String _msdsEndpoint;
private volatile RealmAwareZkClient _zkClient;
+ private volatile RealmAwareZkClient _byteArrayZkClient;
private volatile ZKHelixAdmin _zkHelixAdmin;
private volatile ClusterSetup _clusterSetup;
@@ -81,7 +83,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
*/
private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
// Create a dedicated ZkClient for listening to data changes in routing data
- private RealmAwareZkClient _zkClientForListener;
+ private RealmAwareZkClient _zkClientForRoutingDataListener;
public ServerContext(String zkAddr) {
this(zkAddr, false, null);
@@ -109,54 +111,99 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
_zkMetadataStoreDirectory = ZkMetadataStoreDirectory.getInstance();
}
+ /**
+ * Lazy initialization of RealmAwareZkClient used throughout the REST server.
+ * @return
+ */
public RealmAwareZkClient getRealmAwareZkClient() {
if (_zkClient == null) {
synchronized (this) {
if (_zkClient == null) {
- // If the multi ZK config is enabled, use FederatedZkClient on
multi-realm mode
- if (_isMultiZkEnabled || Boolean
-
.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
- try {
- // Make sure the ServerContext is subscribed to routing data
change so that it knows
- // when to reset ZkClient and Helix APIs
- if (_zkClientForListener == null) {
- _zkClientForListener = DedicatedZkClientFactory.getInstance()
- .buildZkClient(new
HelixZkClient.ZkConnectionConfig(_zkAddr),
- new HelixZkClient.ZkClientConfig()
- .setZkSerializer(new ZNRecordSerializer()));
- }
- // Refresh data subscription
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
- LOG.info("ServerContext: subscribed to routing data in routing
ZK at {}!", _zkAddr);
-
- RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder
connectionConfigBuilder =
- new
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
- // If MSDS endpoint is set for this namespace, use that instead.
- if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
-
connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint)
-
.setRoutingDataSourceType(RoutingDataReaderType.HTTP.name());
- }
- _zkClient = new
FederatedZkClient(connectionConfigBuilder.build(),
- new RealmAwareZkClient.RealmAwareZkClientConfig()
- .setZkSerializer(new ZNRecordSerializer()));
- LOG.info("ServerContext: FederatedZkClient created
successfully!");
- } catch (InvalidRoutingDataException | IllegalStateException e) {
- throw new HelixException("Failed to create FederatedZkClient!",
e);
- }
- } else {
- // If multi ZK config is not set, just connect to the ZK address
given
- HelixZkClient.ZkClientConfig clientConfig = new
HelixZkClient.ZkClientConfig();
- clientConfig.setZkSerializer(new ZNRecordSerializer());
- _zkClient = SharedZkClientFactory.getInstance()
- .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
clientConfig);
- }
+ _zkClient = createRealmAwareZkClient(_zkClient, true, new
ZNRecordSerializer());
}
}
}
return _zkClient;
}
+ /**
+ * Returns a RealmAWareZkClient with ByteArraySerializer with double-checked
locking.
+ * NOTE: this is different from getRealmAwareZkClient in that it does not
reset listeners for
+ * _zkClientForListener because this RealmAwareZkClient is independent from
routing data changes.
+ * @return
+ */
+ public RealmAwareZkClient getByteArrayRealmAwareZkClient() {
+ if (_byteArrayZkClient == null) {
+ synchronized (this) {
+ if (_byteArrayZkClient == null) {
+ _byteArrayZkClient =
+ createRealmAwareZkClient(_byteArrayZkClient, false, new
ByteArraySerializer());
+ }
+ }
+ }
+ return _byteArrayZkClient;
+ }
+
+ /**
+ * Main creation logic for RealmAwareZkClient.
+ * @param realmAwareZkClient
+ * @param shouldSubscribeToRoutingDataChange if true, it will initialize zk
client to listen on
+ * routing data change and refresh
change subscription
+ * @param zkSerializer the type of ZkSerializer to use
+ * @return
+ */
+ private RealmAwareZkClient createRealmAwareZkClient(RealmAwareZkClient
realmAwareZkClient,
+ boolean shouldSubscribeToRoutingDataChange, ZkSerializer zkSerializer) {
+ // If the multi ZK config is enabled, use FederatedZkClient on multi-realm
mode
+ if (_isMultiZkEnabled || Boolean
+
.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
+ try {
+ if (shouldSubscribeToRoutingDataChange) {
+ initializeZkClientForRoutingData();
+ }
+ RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder
connectionConfigBuilder =
+ new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
+ // If MSDS endpoint is set for this namespace, use that instead.
+ if (_msdsEndpoint != null && !_msdsEndpoint.isEmpty()) {
+ connectionConfigBuilder.setRoutingDataSourceEndpoint(_msdsEndpoint)
+ .setRoutingDataSourceType(RoutingDataReaderType.HTTP.name());
+ }
+ realmAwareZkClient = new
FederatedZkClient(connectionConfigBuilder.build(),
+ new
RealmAwareZkClient.RealmAwareZkClientConfig().setZkSerializer(zkSerializer));
+ LOG.info("ServerContext: FederatedZkClient created successfully!");
+ } catch (InvalidRoutingDataException | IllegalStateException e) {
+ throw new HelixException("Failed to create FederatedZkClient!", e);
+ }
+ } else {
+ // If multi ZK config is not set, just connect to the ZK address given
+ HelixZkClient.ZkClientConfig clientConfig = new
HelixZkClient.ZkClientConfig();
+ clientConfig.setZkSerializer(zkSerializer);
+ realmAwareZkClient = SharedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
clientConfig);
+ }
+ return realmAwareZkClient;
+ }
+
+ /**
+ * Initialization logic for ZkClient for routing data listener.
+ * NOTE: The initialization lifecycle of zkClientForRoutingDataListener is
tied to the private
+ * volatile zkClient.
+ */
+ private void initializeZkClientForRoutingData() {
+ // Make sure the ServerContext is subscribed to routing data change so
that it knows
+ // when to reset ZkClient and Helix APIs
+ if (_zkClientForRoutingDataListener == null) {
+ // Routing data is always in the ZNRecord format
+ _zkClientForRoutingDataListener = DedicatedZkClientFactory.getInstance()
+ .buildZkClient(new HelixZkClient.ZkConnectionConfig(_zkAddr),
+ new HelixZkClient.ZkClientConfig().setZkSerializer(new
ZNRecordSerializer()));
+ }
+ // Refresh data subscription
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
+ LOG.info("ServerContext: subscribed to routing data in routing ZK at {}!",
_zkAddr);
+ }
+
@Deprecated
public ZkClient getZkClient() {
return (ZkClient) getRealmAwareZkClient();
@@ -232,8 +279,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
if (_byteArrayZkBaseDataAccessor == null) {
synchronized (this) {
if (_byteArrayZkBaseDataAccessor == null) {
- _byteArrayZkBaseDataAccessor =
- new ZkBaseDataAccessor<>(_zkAddr, new ByteArraySerializer());
+ _byteArrayZkBaseDataAccessor = new
ZkBaseDataAccessor<>(getByteArrayRealmAwareZkClient());
}
}
}
@@ -247,25 +293,25 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
if (_zkMetadataStoreDirectory != null) {
_zkMetadataStoreDirectory.close();
}
- if (_zkClientForListener != null) {
- _zkClientForListener.close();
+ if (_zkClientForRoutingDataListener != null) {
+ _zkClientForRoutingDataListener.close();
}
}
@Override
public void handleChildChange(String parentPath, List<String> currentChilds)
{
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
// Resubscribe
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
resetZkResources();
}
@Override
public void handleDataChange(String dataPath, Object data) {
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
resetZkResources();
@@ -273,45 +319,45 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
@Override
public void handleDataDeleted(String dataPath) {
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
// Resubscribe
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
resetZkResources();
}
@Override
public void handleStateChanged(Watcher.Event.KeeperState state) {
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
// Resubscribe
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
resetZkResources();
}
@Override
public void handleNewSession(String sessionId) {
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
// Resubscribe
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
resetZkResources();
}
@Override
public void handleSessionEstablishmentError(Throwable error) {
- if (_zkClientForListener == null || _zkClientForListener.isClosed()) {
+ if (_zkClientForRoutingDataListener == null ||
_zkClientForRoutingDataListener.isClosed()) {
return;
}
// Resubscribe
- _zkClientForListener.unsubscribeAll();
- _zkClientForListener.subscribeRoutingDataChanges(this, this);
+ _zkClientForRoutingDataListener.unsubscribeAll();
+ _zkClientForRoutingDataListener.subscribeRoutingDataChanges(this, this);
resetZkResources();
}