This is an automated email from the ASF dual-hosted git repository.
xyuanlu pushed a commit to branch metaclient
in repository https://gitbox.apache.org/repos/asf/helix.git
The following commit(s) were added to refs/heads/metaclient by this push:
new d2023a706 Metaclient leader election - basics (#2558)
d2023a706 is described below
commit d2023a706a63e82409ad1fff34e4feab89e15486
Author: xyuanlu <[email protected]>
AuthorDate: Tue Jul 18 09:41:55 2023 -0700
Metaclient leader election - basics (#2558)
Co-authored-by: Xiaoyuan Lu <[email protected]>
---
.../org/apache/helix/metaclient/api/OpResult.java | 2 +-
.../helix/metaclient/datamodel/DataRecord.java | 5 +
.../helix/metaclient/impl/zk/ZkMetaClient.java | 50 +++--
.../metaclient/impl/zk/util/ZkMetaClientUtil.java | 2 +-
.../leaderelection/LeaderElectionClient.java | 204 ++++++++++++++++++---
.../recipes/leaderelection/LeaderInfo.java | 34 +++-
.../leaderelection/LeaderInfoSerializer.java | 42 +++++
.../java/org/apache/helix/metaclient/TestUtil.java | 22 +++
.../recipes/leaderelection/TestLeaderElection.java | 64 +++++++
.../datamodel/serializer/ZNRecordSerializer.java | 2 +-
.../helix/zookeeper/zkclient/ZkConnection.java | 1 +
11 files changed, 366 insertions(+), 62 deletions(-)
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
index effed8543..e3621190a 100644
--- a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
+++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java
@@ -26,7 +26,7 @@ import java.util.List;
*/
public class OpResult {
- enum Type {
+ public enum Type {
ERRORRESULT,
GETDATARESULT,
GETCHILDRENRESULT,
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
index 3ed6928f1..91e5409ca 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java
@@ -20,6 +20,7 @@ package org.apache.helix.metaclient.datamodel;
*/
import com.fasterxml.jackson.databind.annotation.JsonSerialize;
+import org.apache.helix.metaclient.recipes.leaderelection.LeaderInfo;
import org.apache.helix.zookeeper.datamodel.ZNRecord;
/**
@@ -35,4 +36,8 @@ public class DataRecord extends ZNRecord {
public DataRecord(ZNRecord record) {
super(record);
}
+
+ public DataRecord(DataRecord record, String id) {
+ super(record, id);
+ }
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
index a1b6eb1ad..7f68ec9ca 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java
@@ -83,17 +83,15 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
// Lock all activities related to ZkClient connection
private ReentrantLock _zkClientConnectionMutex = new ReentrantLock();
-
public ZkMetaClient(ZkMetaClientConfig config) {
_initConnectionTimeout = config.getConnectionInitTimeoutInMillis();
_reconnectTimeout =
config.getMetaClientReconnectPolicy().getAutoReconnectTimeout();
// TODO: Right new ZkClient reconnect using exp backoff with fixed max
backoff interval. We should
// Allow user to config reconnect policy
- _zkClient = new ZkClient(
- new ZkConnection(config.getConnectionAddress(), (int)
config.getSessionTimeoutInMillis()),
+ _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(),
(int) config.getSessionTimeoutInMillis()),
(int) _initConnectionTimeout, _reconnectTimeout /*use reconnect
timeout for retry timeout*/,
- config.getZkSerializer(), config.getMonitorType(),
config.getMonitorKey(),
- config.getMonitorInstanceName(), config.getMonitorRootPathOnly(),
false, true);
+ config.getZkSerializer(), config.getMonitorType(),
config.getMonitorKey(), config.getMonitorInstanceName(),
+ config.getMonitorRootPathOnly(), false, true);
_zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor();
_reconnectStateChangeListener = new ReconnectStateChangeListener();
}
@@ -102,6 +100,8 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
public void create(String key, Object data) {
try {
create(key, data, EntryMode.PERSISTENT);
+ } catch (ZkException e) {
+ throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
} catch (Exception e) {
throw new MetaClientException(e);
}
@@ -110,16 +110,18 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void create(String key, Object data, MetaClientInterface.EntryMode
mode) {
- try{
+ try {
_zkClient.create(key, data,
ZkMetaClientUtil.convertMetaClientMode(mode));
- } catch (ZkException | KeeperException e) {
+ } catch (ZkException e) {
+ throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e);
+ } catch (KeeperException e) {
throw new MetaClientException(e);
}
}
@Override
public void createWithTTL(String key, T data, long ttl) {
- try{
+ try {
_zkClient.createPersistentWithTTL(key, data, ttl);
} catch (ZkException e) {
throw translateZkExceptionToMetaclientException(e);
@@ -178,7 +180,6 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
return _zkClient.readData(key, true);
}
-
@Override
public ImmutablePair<T, Stat> getDataAndStat(final String key) {
try {
@@ -231,8 +232,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
// corresponding callbacks for each operation are invoked in order.
@Override
public void setAsyncExecPoolSize(int poolSize) {
- throw new UnsupportedOperationException(
- "All async calls are executed in a single thread to maintain
sequence.");
+ throw new UnsupportedOperationException("All async calls are executed in a
single thread to maintain sequence.");
}
@Override
@@ -243,8 +243,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
} catch (ZkException | KeeperException e) {
throw new MetaClientException(e);
}
- _zkClient.asyncCreate(key, data, entryMode,
- new ZkMetaClientCreateCallbackHandler(cb));
+ _zkClient.asyncCreate(key, data, entryMode, new
ZkMetaClientCreateCallbackHandler(cb));
}
@Override
@@ -258,14 +257,12 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void asyncGet(String key, AsyncCallback.DataCallback cb) {
- _zkClient.asyncGetData(key,
- new ZkMetaClientGetCallbackHandler(cb));
+ _zkClient.asyncGetData(key, new ZkMetaClientGetCallbackHandler(cb));
}
@Override
public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) {
- throw new NotImplementedException(
- "Currently asyncCountChildren is not supported in ZkMetaClient.");
+ throw new NotImplementedException("Currently asyncCountChildren is not
supported in ZkMetaClient.");
/*
* TODO: Only Helix has potential using this API as of now.
(ZkBaseDataAccessor.getChildren())
* Will move impl from ZkBaseDataAccessor to here when retiring
ZkBaseDataAccessor.
@@ -275,8 +272,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void asyncExist(String key, AsyncCallback.StatCallback cb) {
- _zkClient.asyncExists(key,
- new ZkMetaClientExistCallbackHandler(cb));
+ _zkClient.asyncExists(key, new ZkMetaClientExistCallbackHandler(cb));
}
public void asyncDelete(String key, AsyncCallback.VoidCallback cb) {
@@ -285,16 +281,14 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
@Override
public void asyncTransaction(Iterable<Op> ops,
AsyncCallback.TransactionCallback cb) {
- throw new NotImplementedException(
- "Currently asyncTransaction is not supported in ZkMetaClient.");
+ throw new NotImplementedException("Currently asyncTransaction is not
supported in ZkMetaClient.");
- //TODO: There is no active use case for Async transaction.
+ //TODO: There is no active use case for Async transaction.
}
@Override
public void asyncSet(String key, T data, int version,
AsyncCallback.StatCallback cb) {
- _zkClient.asyncSetData(key, data, version,
- new ZkMetaClientSetCallbackHandler(cb));
+ _zkClient.asyncSetData(key, data, version, new
ZkMetaClientSetCallbackHandler(cb));
}
@Override
@@ -332,8 +326,8 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
}
@Override
- public DirectChildSubscribeResult subscribeDirectChildChange(String key,
- DirectChildChangeListener listener, boolean skipWatchingNonExistNode) {
+ public DirectChildSubscribeResult subscribeDirectChildChange(String key,
DirectChildChangeListener listener,
+ boolean skipWatchingNonExistNode) {
ChildrenSubscribeResult result =
_zkClient.subscribeChildChanges(key, new
DirectChildListenerAdapter(listener), skipWatchingNonExistNode);
return new DirectChildSubscribeResult(result.getChildren(),
result.isInstalled());
@@ -466,7 +460,6 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
_reconnectMonitorFuture.cancel(true);
LOG.info("ZkClient reconnect monitor thread is canceled");
}
-
} finally {
_zkClientConnectionMutex.unlock();
}
@@ -539,8 +532,7 @@ public class ZkMetaClient<T> implements
MetaClientInterface<T>, AutoCloseable {
cleanUpAndClose(false, true);
}
}, _reconnectTimeout, TimeUnit.MILLISECONDS);
- LOG.info("ZkClient is Disconnected, schedule a reconnect monitor
after {}",
- _reconnectTimeout);
+ LOG.info("ZkClient is Disconnected, schedule a reconnect monitor
after {}", _reconnectTimeout);
}
} finally {
_zkClientConnectionMutex.unlock();
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
index dec8711cf..f93e98919 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java
@@ -39,8 +39,8 @@ import
org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException;
import org.apache.helix.zookeeper.zkclient.exception.ZkException;
import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException;
-import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException;
+import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.Op;
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
index 4f609153b..d5bdb735a 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java
@@ -19,10 +19,27 @@ package org.apache.helix.metaclient.recipes.leaderelection;
* under the License.
*/
+import java.util.Arrays;
+import java.util.ConcurrentModificationException;
+import java.util.HashSet;
import java.util.List;
+import java.util.Set;
+import org.apache.commons.lang3.tuple.ImmutablePair;
+import org.apache.helix.metaclient.api.DataChangeListener;
import org.apache.helix.metaclient.api.MetaClientInterface;
+import org.apache.helix.metaclient.api.Op;
+import org.apache.helix.metaclient.api.OpResult;
+import org.apache.helix.metaclient.exception.MetaClientException;
+import org.apache.helix.metaclient.exception.MetaClientNoNodeException;
+import org.apache.helix.metaclient.exception.MetaClientNodeExistsException;
import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.apache.helix.metaclient.api.OpResult.Type.*;
/**
@@ -42,7 +59,17 @@ import
org.apache.helix.metaclient.factories.MetaClientConfig;
* When the client is used by a leader election service, one client is created
for each participant.
*
*/
-public class LeaderElectionClient {
+public class LeaderElectionClient implements AutoCloseable {
+
+ private final MetaClientInterface<LeaderInfo> _metaClient;
+ private final String _participant;
+ private static final Logger LOG =
LoggerFactory.getLogger(LeaderElectionClient.class);
+
+ // A list of leader election group that this client joins.
+ private Set<String> _leaderGroups = new HashSet<>();
+
+ private final static String LEADER_ENTRY_KEY = "/LEADER";
+ ReElectListener _reElectListener = new ReElectListener();
/**
* Construct a LeaderElectionClient using a user passed in
leaderElectionConfig. It creates a MetaClient
@@ -53,25 +80,37 @@ public class LeaderElectionClient {
* @param metaClientConfig The config used to create an metaclient.
*/
public LeaderElectionClient(MetaClientConfig metaClientConfig, String
participant) {
-
+ _participant = participant;
+ if (metaClientConfig == null) {
+ throw new IllegalArgumentException("MetaClientConfig cannot be null.");
+ }
+ LOG.info("Creating MetaClient for LeaderElectionClient");
+ if
(MetaClientConfig.StoreType.ZOOKEEPER.equals(metaClientConfig.getStoreType())) {
+ ZkMetaClientConfig zkMetaClientConfig = new
ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress(
+ metaClientConfig.getConnectionAddress()).setZkSerializer((new
LeaderInfoSerializer())).build();
+ _metaClient = new
ZkMetaClientFactory().getMetaClient(zkMetaClientConfig);
+ _metaClient.connect();
+ } else {
+ throw new MetaClientException("Unsupported store type: " +
metaClientConfig.getStoreType());
+ }
}
/**
* Construct a LeaderElectionClient using a user passed in MetaClient object
- * When MetaClient is auto closed be cause of being disconnected and auto
retry connection timed out, user
+ * When MetaClient is auto closed because of being disconnected and auto
retry connection timed out, user
* will need to create a new MetaClient and a new LeaderElectionClient
instance.
*
* @param metaClient metaClient object to be used.
*/
- public LeaderElectionClient(MetaClientInterface metaClient, String
participant) {
-
+ public LeaderElectionClient(MetaClientInterface<LeaderInfo> metaClient,
String participant) {
+ throw new UnsupportedOperationException("Not supported yet.");
}
/**
* Returns true if current participant is the current leadership.
*/
public boolean isLeader(String leaderPath) {
- return false;
+ return getLeader(leaderPath).equalsIgnoreCase(_participant);
}
/**
@@ -79,10 +118,11 @@ public class LeaderElectionClient {
* The Leader Election client maintains and elect an active leader from the
participant pool.
*
* @param leaderPath The path for leader election.
- * @return boolean indicating if the operation is succeeded.
+ * @throws RuntimeException if the operation is not succeeded.
*/
- public boolean joinLeaderElectionParticipantPool(String leaderPath) {
- return false;
+ public void joinLeaderElectionParticipantPool(String leaderPath) {
+ // TODO: create participant entry
+ subscribeAndTryCreateLeaderEntry(leaderPath);
}
/**
@@ -91,10 +131,46 @@ public class LeaderElectionClient {
*
* @param leaderPath The path for leader election.
* @param userInfo Any additional information to associate with this
participant.
- * @return boolean indicating if the operation is succeeded.
+ * @throws RuntimeException if the operation is not succeeded.
*/
- public boolean joinLeaderElectionParticipantPool(String leaderPath, Object
userInfo) {
- return false;
+ public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo
userInfo) {
+ // TODO: create participant entry with info
+ subscribeAndTryCreateLeaderEntry(leaderPath);
+ }
+
+ private void subscribeAndTryCreateLeaderEntry(String leaderPath) {
+ _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
_reElectListener, false);
+ LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY);
+ leaderInfo.setLeaderName(_participant);
+
+ try {
+ // try to create leader entry, assuming leader election group node is
already there
+ _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
+ } catch (MetaClientNodeExistsException ex) {
+ LOG.info("Already a leader for group {}", leaderPath);
+ } catch (MetaClientNoNodeException ex) {
+ try {
+ // try to create leader path root entry
+ _metaClient.create(leaderPath, null);
+ } catch (MetaClientNodeExistsException ignored) {
+ // root entry created by other client, ignore
+ } catch (MetaClientNoNodeException e) {
+ // Parent entry of user provided leader election group path missing.
+ // (e.g. `/a/b` not created in user specified leader election group
path /a/b/c/LeaderGroup)
+ throw new MetaClientException("Parent entry in leaderGroup path" +
leaderPath + " does not exist.");
+ }
+ try {
+ // try to create leader node again.
+ _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo,
MetaClientInterface.EntryMode.EPHEMERAL);
+ } catch (MetaClientNoNodeException e) {
+ // Leader group root entry is gone after we checked at outer catch
block.
+ // Meaning other client removed the group. Throw
ConcurrentModificationException.
+ throw new ConcurrentModificationException(
+ "Other client trying to modify the leader election group at the
same time, please retry.", ex);
+ }
+ }
+
+ _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY);
}
/**
@@ -106,23 +182,64 @@ public class LeaderElectionClient {
* Throws exception if the participant is not in the pool.
*
* @param leaderPath The path for leader election.
- * @return boolean indicating if the operation is succeeded.
+ * @throws RuntimeException if the operation is not succeeded.
*
- * @throws RuntimeException If the participant did not join participant pool
via this client. // TODO: define exp type
+ * @throws RuntimeException If the participant did not join participant pool
via this client.
*/
- public boolean exitLeaderElectionParticipantPool(String leaderPath) {
- return false;
+ public void exitLeaderElectionParticipantPool(String leaderPath) {
+ _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY,
_reElectListener);
+ // TODO: remove from pool folder
+ relinquishLeaderHelper(leaderPath, true);
}
/**
- * Releases leadership for participant.
+ * Releases leadership for participant. Still stays in the participant pool.
*
* @param leaderPath The path for leader election.
*
* @throws RuntimeException if the leadership is not owned by this
participant, or if the
- * participant did not join participant pool via
this client. // TODO: define exp type
+ * participant did not join participant pool via
this client.
*/
public void relinquishLeader(String leaderPath) {
+ relinquishLeaderHelper(leaderPath, false);
+ }
+
+ /**
+ * relinquishLeaderHelper and LeaderElectionParticipantPool if configured
+ * @param leaderPath
+ * @param exitLeaderElectionParticipantPool
+ */
+ private void relinquishLeaderHelper(String leaderPath, Boolean
exitLeaderElectionParticipantPool) {
+ String key = leaderPath + LEADER_ENTRY_KEY;
+ // if current client is in the group
+ if (!_leaderGroups.contains(key)) {
+ throw new MetaClientException("Participant is not in the leader election
group");
+ }
+ // remove leader path from leaderGroups after check if exiting the pool.
+ // to prevent a race condition in In Zk implementation:
+ // If there are delays in ZkClient event queue, it is possible the leader
election client received leader
+ // deleted event after unsubscribeDataChange. We will need to remove it
from in memory `leaderGroups` map before
+ // deleting ZNode. So that handler in ReElectListener won't recreate the
leader node.
+ if (exitLeaderElectionParticipantPool) {
+ _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY);
+ }
+ // check if current participant is the leader
+ // read data and stats, check, and multi check + delete
+ ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup =
_metaClient.getDataAndStat(key);
+ if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) {
+ int expectedVersion = tup.right.getVersion();
+ List<Op> ops = Arrays.asList(Op.check(key, expectedVersion),
Op.delete(key, expectedVersion));
+ //Execute transactional support on operations
+ List<OpResult> opResults = _metaClient.transactionOP(ops);
+ if (opResults.get(0).getType() == ERRORRESULT) {
+ if (isLeader(leaderPath)) {
+ // Participant re-elected as leader.
+ throw new ConcurrentModificationException("Concurrent operation,
please retry");
+ } else {
+ LOG.info("Someone else is already leader");
+ }
+ }
+ }
}
/**
@@ -133,6 +250,12 @@ public class LeaderElectionClient {
* @throws RuntimeException when leader path does not exist. // TODO: define
exp type
*/
public String getLeader(String leaderPath) {
+ LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY);
+ return leaderInfo == null ? null : leaderInfo.getLeaderName();
+ }
+
+ public LeaderInfo getParticipantInfo(String leaderPath) {
+ // TODO: add getParticipantInfo impl
return null;
}
@@ -158,14 +281,14 @@ public class LeaderElectionClient {
* get's auto-deleted after TTL or session timeout) or a new leader comes
up, it notifies all
* participants who have been listening on entryChange event.
*
- * An listener will still be installed if the path does not exists yet.
+ * A listener will still be installed if the path does not exist yet.
*
* @param leaderPath The path for leader election that listener is
interested for change.
* @param listener An implementation of LeaderElectionListenerInterface
- * @return an boolean value indicating if registration is success.
+ * @return A boolean value indicating if registration is success.
*/
- public boolean subscribeLeadershipChanges(String leaderPath,
- LeaderElectionListenerInterface listener) {
+ public boolean subscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
+ //TODO: add converter class for LeaderElectionListenerInterface
return false;
}
@@ -173,8 +296,41 @@ public class LeaderElectionClient {
* @param leaderPath The path for leader election that listener is no longer
interested for change.
* @param listener An implementation of LeaderElectionListenerInterface
*/
- public void unsubscribeLeadershipChanges(String leaderPath,
- LeaderElectionListenerInterface listener) {
+ public void unsubscribeLeadershipChanges(String leaderPath,
LeaderElectionListenerInterface listener) {
+ }
+
+ @Override
+ public void close() throws Exception {
+
+ // exit all previous joined leader election groups
+ for (String leaderGroup : _leaderGroups) {
+ String leaderGroupPathName =
+ leaderGroup.substring(0, leaderGroup.length() -
LEADER_ENTRY_KEY.length() /*remove '/LEADER' */);
+ exitLeaderElectionParticipantPool(leaderGroupPathName);
+ }
+
+ // TODO: if last participant, remove folder
+ _metaClient.disconnect();
+ }
+
+ class ReElectListener implements DataChangeListener {
+
+ @Override
+ public void handleDataChange(String key, Object data, ChangeType
changeType) throws Exception {
+ if (changeType == ChangeType.ENTRY_CREATED) {
+ LOG.info("new leader {} for leader election group {}.", ((LeaderInfo)
data).getLeaderName(), key);
+ } else if (changeType == ChangeType.ENTRY_DELETED) {
+ if (_leaderGroups.contains(key)) {
+ LeaderInfo lf = new LeaderInfo("LEADER");
+ lf.setLeaderName(_participant);
+ try {
+ _metaClient.create(key, lf,
MetaClientInterface.EntryMode.EPHEMERAL);
+ } catch (MetaClientNodeExistsException ex) {
+ LOG.info("Already a leader {} for leader election group {}.",
((LeaderInfo) data).getLeaderName(), key);
+ }
+ }
+ }
+ }
}
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
index 9e817c1c9..ab0562502 100644
---
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java
@@ -19,27 +19,49 @@ package org.apache.helix.metaclient.recipes.leaderelection;
* under the License.
*/
+import com.fasterxml.jackson.annotation.JsonCreator;
+import com.fasterxml.jackson.annotation.JsonIgnore;
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonProperty;
+import com.fasterxml.jackson.databind.annotation.JsonSerialize;
import org.apache.helix.metaclient.datamodel.DataRecord;
/**
* This is the data represent leader election info of a leader election path.
*/
-public class LeaderInfo {
+@JsonIgnoreProperties(ignoreUnknown = true)
+@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL)
+public class LeaderInfo extends DataRecord {
- private String _leaderElectionGroupName;
- private final DataRecord _record;
+ public LeaderInfo(DataRecord dataRecord) {
+ super(dataRecord);
+ }
+ @JsonCreator
+ public LeaderInfo(@JsonProperty("id") String id) {
+ super(id);
+ }
- public LeaderInfo( String leaderElectionGroupName) {
- _leaderElectionGroupName = leaderElectionGroupName;
- _record = new DataRecord(_leaderElectionGroupName);
+ public LeaderInfo(LeaderInfo info, String id) {
+ super(info, id);
}
+
public enum LeaderAttribute {
LEADER_NAME,
PARTICIPANTS
}
+@JsonIgnore(true)
+public String getLeaderName() {
+ return getSimpleField("LEADER_NAME");
+ }
+
+ @JsonIgnore(true)
+ public void setLeaderName(String id) {
+ setSimpleField("LEADER_NAME", id);
+ }
+
}
diff --git
a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java
new file mode 100644
index 000000000..fd6d256f6
--- /dev/null
+++
b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java
@@ -0,0 +1,42 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import com.fasterxml.jackson.databind.DeserializationFeature;
+import com.fasterxml.jackson.databind.MapperFeature;
+import java.io.ByteArrayInputStream;
+import org.apache.helix.zookeeper.datamodel.ZNRecord;
+import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer;
+import org.apache.helix.zookeeper.util.GZipCompressionUtil;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+public class LeaderInfoSerializer extends ZNRecordSerializer {
+
+ private static final Logger LOG =
LoggerFactory.getLogger(LeaderInfoSerializer.class);
+
+ @Override
+ public Object deserialize(byte[] bytes) {
+ if (bytes == null || bytes.length == 0) {
+ // reading a parent/null node
+ return null;
+ }
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(bytes);
+
+ mapper.enable(MapperFeature.AUTO_DETECT_FIELDS);
+ mapper.enable(MapperFeature.AUTO_DETECT_SETTERS);
+ mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES);
+ try {
+ //decompress the data if its already compressed
+ if (GZipCompressionUtil.isCompressed(bytes)) {
+ byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais);
+ bais = new ByteArrayInputStream(uncompressedBytes);
+ }
+
+ return mapper.readValue(bais, LeaderInfo.class);
+ } catch (Exception e) {
+ LOG.error("Exception during deserialization of bytes: {}", new
String(bytes), e);
+ return null;
+ }
+ }
+}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
new file mode 100644
index 000000000..9b20c8e53
--- /dev/null
+++ b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java
@@ -0,0 +1,22 @@
+package org.apache.helix.metaclient;
+
+public class TestUtil {
+ public static final long WAIT_DURATION = 6 * 1000L;
+ public interface Verifier {
+ boolean verify()
+ throws Exception;
+ }
+
+ public static boolean verify(Verifier verifier, long timeout)
+ throws Exception {
+ long start = System.currentTimeMillis();
+ do {
+ boolean result = verifier.verify();
+ if (result || (System.currentTimeMillis() - start) > timeout) {
+ return result;
+ }
+ Thread.sleep(50);
+ } while (true);
+ }
+
+}
diff --git
a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
new file mode 100644
index 000000000..3a45d97ce
--- /dev/null
+++
b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java
@@ -0,0 +1,64 @@
+package org.apache.helix.metaclient.recipes.leaderelection;
+
+import org.apache.helix.metaclient.TestUtil;
+import org.apache.helix.metaclient.factories.MetaClientConfig;
+import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase;
+import org.testng.Assert;
+import org.testng.annotations.Test;
+
+
+public class TestLeaderElection extends ZkMetaClientTestBase {
+
+ private static final String PARTICIPANT_NAME1 = "participant_1";
+ private static final String PARTICIPANT_NAME2 = "participant_2";
+ private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1";
+
+ public LeaderElectionClient createLeaderElectionClient(String
participantName) {
+ MetaClientConfig.StoreType storeType =
MetaClientConfig.StoreType.ZOOKEEPER;
+ MetaClientConfig config = new
MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR)
+ .setStoreType(storeType).build();
+ return new LeaderElectionClient(config, participantName);
+ }
+
+ @Test
+ public void testAcquireLeadership() throws Exception {
+ // create 2 clients representing 2 participants
+ LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1);
+ LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2);
+
+ clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
+ clt2.joinLeaderElectionParticipantPool(LEADER_PATH);
+ // First client joining the leader election group should be current leader
+ Assert.assertTrue(TestUtil.verify(() -> {
+ return (clt1.getLeader(LEADER_PATH) != null);
+ }, TestUtil.WAIT_DURATION));
+ Assert.assertNotNull(clt1.getLeader(LEADER_PATH));
+ Assert.assertEquals(clt1.getLeader(LEADER_PATH),
clt2.getLeader(LEADER_PATH));
+ Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1);
+
+ // client 1 exit leader election group, and client 2 should be current
leader.
+ clt1.exitLeaderElectionParticipantPool(LEADER_PATH);
+ Assert.assertTrue(TestUtil.verify(() -> {
+ return (clt1.getLeader(LEADER_PATH) != null);
+ }, TestUtil.WAIT_DURATION));
+ Assert.assertTrue(TestUtil.verify(() -> {
+ return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2));
+ }, TestUtil.WAIT_DURATION));
+
+ // client1 join and client2 leave. client 1 should be leader.
+ clt1.joinLeaderElectionParticipantPool(LEADER_PATH);
+ clt2.exitLeaderElectionParticipantPool(LEADER_PATH);
+ Assert.assertTrue(TestUtil.verify(() -> {
+ return (clt1.getLeader(LEADER_PATH) != null);
+ }, TestUtil.WAIT_DURATION));
+ Assert.assertTrue(TestUtil.verify(() -> {
+ return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1));
+ }, TestUtil.WAIT_DURATION));
+ Assert.assertTrue(clt1.isLeader(LEADER_PATH));
+ Assert.assertFalse(clt2.isLeader(LEADER_PATH));
+
+ clt1.close();
+ clt2.close();
+ }
+
+}
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
index 5838cfad7..0cc3215bb 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java
@@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory;
public class ZNRecordSerializer implements ZkSerializer {
private static final Logger LOG =
LoggerFactory.getLogger(ZNRecordSerializer.class);
- private static ObjectMapper mapper = new ObjectMapper()
+ protected static ObjectMapper mapper = new ObjectMapper()
// TODO: remove it after upgrading ZNRecord's annotations to Jackson 2
.setAnnotationIntrospector(new CodehausJacksonIntrospector());
diff --git
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
index 376409231..589425462 100644
---
a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
+++
b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java
@@ -260,6 +260,7 @@ public class ZkConnection implements IZkConnection {
private void lookupGetChildrenMethod() {
_getChildrenMethod = doLookUpGetChildrenMethod();
+ System.out.println("
ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " +
GETCHILDREN_PAGINATION_DISABLED);
LOG.info("Pagination config {}={}, method to be invoked: {}",
ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED,
GETCHILDREN_PAGINATION_DISABLED,
_getChildrenMethod.getName());