This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new c090ec0 Fix bug in resetting ByteArrayZkClient (#1531)
c090ec0 is described below
commit c090ec0e0371a73cc652d6e4b360ec4241db0ba1
Author: Hunter Lee <[email protected]>
AuthorDate: Tue Nov 17 11:47:47 2020 -0800
Fix bug in resetting ByteArrayZkClient (#1531)
It was discovered that even though there was a routing data change,
_byteArrayZkClient was not being created anew using the newly read routing
data. This would cause issues with the zookeeper endpoints provided through
Helix REST. This change cleans up some internal methods and ensures that the
byte array zk client gets nullified every time there is routing data change.
More specifically, the code was closing byteArrayDataAccessor but not
byteArrayZkClient. This meant that the same byteArrayZkClient would be used
even though everything was reset and routing data was updated. Hence, the data
accessor was not really being created from scratch. This change makes sure that
the underlying byteArrayZkClient gets closed and nullified upon routing data
change.
---
.../apache/helix/rest/server/ServerContext.java | 58 +++++++++++++---------
1 file changed, 34 insertions(+), 24 deletions(-)
diff --git
a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
index 231284a..5babb19 100644
--- a/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
+++ b/helix-rest/src/main/java/org/apache/helix/rest/server/ServerContext.java
@@ -62,8 +62,8 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
private static final Logger LOG =
LoggerFactory.getLogger(ServerContext.class);
private final String _zkAddr;
- private boolean _isMultiZkEnabled;
private final String _msdsEndpoint;
+ private final boolean _isMultiZkEnabled;
private volatile RealmAwareZkClient _zkClient;
private volatile RealmAwareZkClient _byteArrayZkClient;
@@ -81,7 +81,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
/**
* Multi-ZK support
*/
- private ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
+ private final ZkMetadataStoreDirectory _zkMetadataStoreDirectory;
// Create a dedicated ZkClient for listening to data changes in routing data
private RealmAwareZkClient _zkClientForRoutingDataListener;
@@ -119,7 +119,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
if (_zkClient == null) {
synchronized (this) {
if (_zkClient == null) {
- _zkClient = createRealmAwareZkClient(_zkClient, true, new
ZNRecordSerializer());
+ _zkClient = createRealmAwareZkClient(new ZNRecordSerializer());
}
}
}
@@ -128,16 +128,13 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
/**
* Returns a RealmAWareZkClient with ByteArraySerializer with double-checked
locking.
- * NOTE: this is different from getRealmAwareZkClient in that it does not
reset listeners for
- * _zkClientForListener because this RealmAwareZkClient is independent from
routing data changes.
* @return
*/
public RealmAwareZkClient getByteArrayRealmAwareZkClient() {
if (_byteArrayZkClient == null) {
synchronized (this) {
if (_byteArrayZkClient == null) {
- _byteArrayZkClient =
- createRealmAwareZkClient(_byteArrayZkClient, false, new
ByteArraySerializer());
+ _byteArrayZkClient = createRealmAwareZkClient(new
ByteArraySerializer());
}
}
}
@@ -146,21 +143,16 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
/**
* Main creation logic for RealmAwareZkClient.
- * @param realmAwareZkClient
- * @param shouldSubscribeToRoutingDataChange if true, it will initialize zk
client to listen on
- * routing data change and refresh
change subscription
* @param zkSerializer the type of ZkSerializer to use
* @return
*/
- private RealmAwareZkClient createRealmAwareZkClient(RealmAwareZkClient
realmAwareZkClient,
- boolean shouldSubscribeToRoutingDataChange, ZkSerializer zkSerializer) {
+ private RealmAwareZkClient createRealmAwareZkClient(ZkSerializer
zkSerializer) {
// If the multi ZK config is enabled, use FederatedZkClient on multi-realm
mode
+ RealmAwareZkClient realmAwareZkClient;
if (_isMultiZkEnabled || Boolean
.parseBoolean(System.getProperty(SystemPropertyKeys.MULTI_ZK_ENABLED))) {
try {
- if (shouldSubscribeToRoutingDataChange) {
- initializeZkClientForRoutingData();
- }
+ initializeZkClientForRoutingData();
RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder
connectionConfigBuilder =
new RealmAwareZkClient.RealmAwareZkConnectionConfig.Builder();
// If MSDS endpoint is set for this namespace, use that instead.
@@ -191,7 +183,7 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
*/
private void initializeZkClientForRoutingData() {
// Make sure the ServerContext is subscribed to routing data change so
that it knows
- // when to reset ZkClient and Helix APIs
+ // when to reset ZkClients and Helix APIs
if (_zkClientForRoutingDataListener == null) {
// Routing data is always in the ZNRecord format
_zkClientForRoutingDataListener = DedicatedZkClientFactory.getInstance()
@@ -362,8 +354,10 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
}
/**
- * Resets all internally cached routing data by closing and nullifying the
ZkClient and Helix APIs.
- * This is okay because routing data update should be infrequent.
+ * Resets all internally cached routing data by closing and nullifying both
zkClient and
+ * byteArrayZkClient and Helix APIs.
+ * This operation is considered costly since it triggers reconnection of all
I/O resources, but
+ * this is okay because routing data update should be infrequent.
*/
private void resetZkResources() {
synchronized (this) {
@@ -372,18 +366,34 @@ public class ServerContext implements IZkDataListener,
IZkChildListener, IZkStat
try {
// Reset RoutingDataManager's cache
RoutingDataManager.getInstance().reset();
- // All Helix APIs will be closed implicitly because ZkClient is closed
+
+ // Close all ZkClients
if (_zkClient != null && !_zkClient.isClosed()) {
_zkClient.close();
}
+ if (_byteArrayZkClient != null && !_byteArrayZkClient.isClosed()) {
+ _byteArrayZkClient.close();
+ }
+ _zkClient = null;
+ _byteArrayZkClient = null;
+
+ // Close all Helix APIs
+ if (_zkHelixAdmin != null) {
+ _zkHelixAdmin.close();
+ _zkHelixAdmin = null;
+ }
+ if (_clusterSetup != null) {
+ _clusterSetup.close();
+ _clusterSetup = null;
+ }
+ if (_configAccessor != null) {
+ _configAccessor.close();
+ _configAccessor = null;
+ }
if (_byteArrayZkBaseDataAccessor != null) {
_byteArrayZkBaseDataAccessor.close();
+ _byteArrayZkBaseDataAccessor = null;
}
- _zkClient = null;
- _zkHelixAdmin = null;
- _clusterSetup = null;
- _configAccessor = null;
- _byteArrayZkBaseDataAccessor = null;
_helixDataAccessorPool.clear();
_taskDriverPool.clear();
} catch (Exception e) {