This is an automated email from the ASF dual-hosted git repository.
hulee pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/master by this push:
new bf40f7e Add Builder to ZKDistributedNonblockingLock to enable
ZooScalability use (#1409)
bf40f7e is described below
commit bf40f7ea34e48f27c71fc247f50b68a5197dcef8
Author: Hunter Lee <[email protected]>
AuthorDate: Fri Oct 2 11:35:38 2020 -0700
Add Builder to ZKDistributedNonblockingLock to enable ZooScalability use
(#1409)
ZKDistributedNonblockingLock and its related helix-lock API were not
ZooScalability-compliant, meaning they could not be used in a multi-ZK
environment. This PR updates the said APIs so that we could use them in
multi-ZK environment by adding a Builder that allows users to set multi-ZK
parameters.
---
.../helix/manager/zk/GenericZkHelixApiBuilder.java | 2 -
.../lock/helix/ZKDistributedNonblockingLock.java | 72 +++++++++++++++++++---
2 files changed, 65 insertions(+), 9 deletions(-)
diff --git
a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
index 0447eee..fd24eb3 100644
---
a/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
+++
b/helix-core/src/main/java/org/apache/helix/manager/zk/GenericZkHelixApiBuilder.java
@@ -24,11 +24,9 @@ import
org.apache.helix.msdcommon.datamodel.MetadataStoreRoutingData;
import org.apache.helix.msdcommon.exception.InvalidRoutingDataException;
import org.apache.helix.zookeeper.api.client.HelixZkClient;
import org.apache.helix.zookeeper.api.client.RealmAwareZkClient;
-import org.apache.helix.zookeeper.constant.RoutingDataReaderType;
import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
import org.apache.helix.zookeeper.impl.client.FederatedZkClient;
import org.apache.helix.zookeeper.impl.factory.SharedZkClientFactory;
-import org.apache.helix.zookeeper.routing.RoutingDataManager;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
diff --git
a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
index a58d33d..18a9a3a 100644
---
a/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
+++
b/helix-lock/src/main/java/org/apache/helix/lock/helix/ZKDistributedNonblockingLock.java
@@ -24,9 +24,11 @@ import java.util.Date;
import org.apache.helix.AccessOption;
import org.apache.helix.BaseDataAccessor;
import org.apache.helix.HelixException;
+import org.apache.helix.SystemPropertyKeys;
import org.apache.helix.lock.DistributedLock;
import org.apache.helix.lock.LockInfo;
import org.apache.helix.lock.LockScope;
+import org.apache.helix.manager.zk.GenericZkHelixApiBuilder;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
import org.apache.helix.zookeeper.zkclient.DataUpdater;
@@ -34,7 +36,9 @@ import org.apache.log4j.Logger;
/**
- * Helix nonblocking lock implementation based on Zookeeper
+ * Helix nonblocking lock implementation based on Zookeeper.
+ * NOTE: do NOT use ephemeral nodes in this implementation because ephemeral
mode is not supported
+ * in ZooScalability mode.
*/
public class ZKDistributedNonblockingLock implements DistributedLock {
private static final Logger LOG =
Logger.getLogger(ZKDistributedNonblockingLock.class);
@@ -55,19 +59,19 @@ public class ZKDistributedNonblockingLock implements
DistributedLock {
*/
public ZKDistributedNonblockingLock(LockScope scope, String zkAddress, Long
timeout,
String lockMsg, String userId) {
- this(scope.getPath(), zkAddress, timeout, lockMsg, userId);
+ this(scope.getPath(), timeout, lockMsg, userId, new
ZkBaseDataAccessor<ZNRecord>(zkAddress));
}
/**
* Initialize the lock with user provided information, e.g., lock path under
zookeeper, etc.
* @param lockPath the path of the lock under Zookeeper
- * @param zkAddress the zk address of the cluster
* @param timeout the timeout period of the lock
* @param lockMsg the reason for having this lock
* @param userId a universal unique userId for lock owner identity
+ * @param baseDataAccessor baseDataAccessor instance to do I/O against ZK
with
*/
- private ZKDistributedNonblockingLock(String lockPath, String zkAddress, Long
timeout,
- String lockMsg, String userId) {
+ private ZKDistributedNonblockingLock(String lockPath, Long timeout, String
lockMsg, String userId,
+ BaseDataAccessor<ZNRecord> baseDataAccessor) {
_lockPath = lockPath;
if (timeout < 0) {
throw new IllegalArgumentException("The expiration time cannot be
negative.");
@@ -75,7 +79,7 @@ public class ZKDistributedNonblockingLock implements
DistributedLock {
_timeout = timeout;
_lockMsg = lockMsg;
_userId = userId;
- _baseDataAccessor = new ZkBaseDataAccessor<>(zkAddress);
+ _baseDataAccessor = baseDataAccessor;
}
@Override
@@ -143,11 +147,65 @@ public class ZKDistributedNonblockingLock implements
DistributedLock {
if (!(System.currentTimeMillis() < curLockInfo.getTimeout()) ||
isCurrentOwner()) {
return _record;
}
- // For users who are not the lock owner and try to do an update on a
lock that is held by someone else, exception thrown is to be caught by data
accessor, and return false for the update
+ // For users who are not the lock owner and try to do an update on a
lock that is held by
+ // someone else, exception thrown is to be caught by data accessor, and
return false for
+ // the update
LOG.error(
"User " + _userId + " tried to update the lock at " + new
Date(System.currentTimeMillis())
+ ". Lock path: " + _lockPath);
throw new HelixException("User is not authorized to perform this
operation.");
}
}
+
+ /**
+ * Builder class to use with ZKDistributedNonblockingLock.
+ */
+ public static class Builder extends GenericZkHelixApiBuilder<Builder> {
+ private LockScope _lockScope;
+ private String _userId;
+ private long _timeout;
+ private String _lockMsg;
+
+ public Builder() {
+ }
+
+ public Builder setLockScope(LockScope lockScope) {
+ _lockScope = lockScope;
+ return this;
+ }
+
+ public Builder setUserId(String userId) {
+ _userId = userId;
+ return this;
+ }
+
+ public Builder setTimeout(long timeout) {
+ _timeout = timeout;
+ return this;
+ }
+
+ public Builder setLockMsg(String lockMsg) {
+ _lockMsg = lockMsg;
+ return this;
+ }
+
+ public ZKDistributedNonblockingLock build() {
+ // Resolve which way we want to create BaseDataAccessor instance
+ BaseDataAccessor<ZNRecord> baseDataAccessor;
+ // If enabled via System.Properties config or the given zkAddress is
null, use ZooScalability
+ if (Boolean.getBoolean(SystemPropertyKeys.MULTI_ZK_ENABLED) ||
_zkAddress == null) {
+ // If the multi ZK config is enabled, use multi-realm mode with
FederatedZkClient
+ baseDataAccessor = new
ZkBaseDataAccessor.Builder<ZNRecord>().setRealmMode(_realmMode)
+ .setRealmAwareZkClientConfig(_realmAwareZkClientConfig)
+
.setRealmAwareZkConnectionConfig(_realmAwareZkConnectionConfig).setZkAddress(_zkAddress)
+ .build();
+ } else {
+ baseDataAccessor = new ZkBaseDataAccessor<>(_zkAddress);
+ }
+
+ // Return a ZKDistributedNonblockingLock instance
+ return new ZKDistributedNonblockingLock(_lockScope.getPath(), _timeout,
_lockMsg, _userId,
+ baseDataAccessor);
+ }
+ }
}