This is an automated email from the ASF dual-hosted git repository. jxue pushed a commit to branch metaclient in repository https://gitbox.apache.org/repos/asf/helix.git
commit 294fae2c00082b7478bb476554cab2729b23485a Author: xyuanlu <xyua...@gmail.com> AuthorDate: Tue Jul 18 09:41:55 2023 -0700 Metaclient leader election - basics (#2558) Co-authored-by: Xiaoyuan Lu <xi...@xialu-mn2.linkedin.biz> --- .../org/apache/helix/metaclient/api/OpResult.java | 2 +- .../helix/metaclient/datamodel/DataRecord.java | 5 + .../helix/metaclient/impl/zk/ZkMetaClient.java | 50 +++-- .../metaclient/impl/zk/util/ZkMetaClientUtil.java | 2 +- .../leaderelection/LeaderElectionClient.java | 204 ++++++++++++++++++--- .../recipes/leaderelection/LeaderInfo.java | 34 +++- .../leaderelection/LeaderInfoSerializer.java | 42 +++++ .../java/org/apache/helix/metaclient/TestUtil.java | 22 +++ .../recipes/leaderelection/TestLeaderElection.java | 64 +++++++ .../datamodel/serializer/ZNRecordSerializer.java | 2 +- .../helix/zookeeper/zkclient/ZkConnection.java | 1 + 11 files changed, 366 insertions(+), 62 deletions(-) diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java index effed8543..e3621190a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/api/OpResult.java @@ -26,7 +26,7 @@ import java.util.List; */ public class OpResult { - enum Type { + public enum Type { ERRORRESULT, GETDATARESULT, GETCHILDRENRESULT, diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java index 3ed6928f1..91e5409ca 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/datamodel/DataRecord.java @@ -20,6 +20,7 @@ package org.apache.helix.metaclient.datamodel; */ import com.fasterxml.jackson.databind.annotation.JsonSerialize; +import org.apache.helix.metaclient.recipes.leaderelection.LeaderInfo; import org.apache.helix.zookeeper.datamodel.ZNRecord; /** @@ -35,4 +36,8 @@ public class DataRecord extends ZNRecord { public DataRecord(ZNRecord record) { super(record); } + + public DataRecord(DataRecord record, String id) { + super(record, id); + } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java index a1b6eb1ad..7f68ec9ca 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/ZkMetaClient.java @@ -83,17 +83,15 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { // Lock all activities related to ZkClient connection private ReentrantLock _zkClientConnectionMutex = new ReentrantLock(); - public ZkMetaClient(ZkMetaClientConfig config) { _initConnectionTimeout = config.getConnectionInitTimeoutInMillis(); _reconnectTimeout = config.getMetaClientReconnectPolicy().getAutoReconnectTimeout(); // TODO: Right new ZkClient reconnect using exp backoff with fixed max backoff interval. We should // Allow user to config reconnect policy - _zkClient = new ZkClient( - new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), + _zkClient = new ZkClient(new ZkConnection(config.getConnectionAddress(), (int) config.getSessionTimeoutInMillis()), (int) _initConnectionTimeout, _reconnectTimeout /*use reconnect timeout for retry timeout*/, - config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), - config.getMonitorInstanceName(), config.getMonitorRootPathOnly(), false, true); + config.getZkSerializer(), config.getMonitorType(), config.getMonitorKey(), config.getMonitorInstanceName(), + config.getMonitorRootPathOnly(), false, true); _zkClientReconnectMonitor = Executors.newSingleThreadScheduledExecutor(); _reconnectStateChangeListener = new ReconnectStateChangeListener(); } @@ -102,6 +100,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { public void create(String key, Object data) { try { create(key, data, EntryMode.PERSISTENT); + } catch (ZkException e) { + throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e); } catch (Exception e) { throw new MetaClientException(e); } @@ -110,16 +110,18 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void create(String key, Object data, MetaClientInterface.EntryMode mode) { - try{ + try { _zkClient.create(key, data, ZkMetaClientUtil.convertMetaClientMode(mode)); - } catch (ZkException | KeeperException e) { + } catch (ZkException e) { + throw ZkMetaClientUtil.translateZkExceptionToMetaclientException(e); + } catch (KeeperException e) { throw new MetaClientException(e); } } @Override public void createWithTTL(String key, T data, long ttl) { - try{ + try { _zkClient.createPersistentWithTTL(key, data, ttl); } catch (ZkException e) { throw translateZkExceptionToMetaclientException(e); @@ -178,7 +180,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { return _zkClient.readData(key, true); } - @Override public ImmutablePair<T, Stat> getDataAndStat(final String key) { try { @@ -231,8 +232,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { // corresponding callbacks for each operation are invoked in order. @Override public void setAsyncExecPoolSize(int poolSize) { - throw new UnsupportedOperationException( - "All async calls are executed in a single thread to maintain sequence."); + throw new UnsupportedOperationException("All async calls are executed in a single thread to maintain sequence."); } @Override @@ -243,8 +243,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } catch (ZkException | KeeperException e) { throw new MetaClientException(e); } - _zkClient.asyncCreate(key, data, entryMode, - new ZkMetaClientCreateCallbackHandler(cb)); + _zkClient.asyncCreate(key, data, entryMode, new ZkMetaClientCreateCallbackHandler(cb)); } @Override @@ -258,14 +257,12 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void asyncGet(String key, AsyncCallback.DataCallback cb) { - _zkClient.asyncGetData(key, - new ZkMetaClientGetCallbackHandler(cb)); + _zkClient.asyncGetData(key, new ZkMetaClientGetCallbackHandler(cb)); } @Override public void asyncCountChildren(String key, AsyncCallback.DataCallback cb) { - throw new NotImplementedException( - "Currently asyncCountChildren is not supported in ZkMetaClient."); + throw new NotImplementedException("Currently asyncCountChildren is not supported in ZkMetaClient."); /* * TODO: Only Helix has potential using this API as of now. (ZkBaseDataAccessor.getChildren()) * Will move impl from ZkBaseDataAccessor to here when retiring ZkBaseDataAccessor. @@ -275,8 +272,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void asyncExist(String key, AsyncCallback.StatCallback cb) { - _zkClient.asyncExists(key, - new ZkMetaClientExistCallbackHandler(cb)); + _zkClient.asyncExists(key, new ZkMetaClientExistCallbackHandler(cb)); } public void asyncDelete(String key, AsyncCallback.VoidCallback cb) { @@ -285,16 +281,14 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { @Override public void asyncTransaction(Iterable<Op> ops, AsyncCallback.TransactionCallback cb) { - throw new NotImplementedException( - "Currently asyncTransaction is not supported in ZkMetaClient."); + throw new NotImplementedException("Currently asyncTransaction is not supported in ZkMetaClient."); - //TODO: There is no active use case for Async transaction. + //TODO: There is no active use case for Async transaction. } @Override public void asyncSet(String key, T data, int version, AsyncCallback.StatCallback cb) { - _zkClient.asyncSetData(key, data, version, - new ZkMetaClientSetCallbackHandler(cb)); + _zkClient.asyncSetData(key, data, version, new ZkMetaClientSetCallbackHandler(cb)); } @Override @@ -332,8 +326,8 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { } @Override - public DirectChildSubscribeResult subscribeDirectChildChange(String key, - DirectChildChangeListener listener, boolean skipWatchingNonExistNode) { + public DirectChildSubscribeResult subscribeDirectChildChange(String key, DirectChildChangeListener listener, + boolean skipWatchingNonExistNode) { ChildrenSubscribeResult result = _zkClient.subscribeChildChanges(key, new DirectChildListenerAdapter(listener), skipWatchingNonExistNode); return new DirectChildSubscribeResult(result.getChildren(), result.isInstalled()); @@ -466,7 +460,6 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { _reconnectMonitorFuture.cancel(true); LOG.info("ZkClient reconnect monitor thread is canceled"); } - } finally { _zkClientConnectionMutex.unlock(); } @@ -539,8 +532,7 @@ public class ZkMetaClient<T> implements MetaClientInterface<T>, AutoCloseable { cleanUpAndClose(false, true); } }, _reconnectTimeout, TimeUnit.MILLISECONDS); - LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}", - _reconnectTimeout); + LOG.info("ZkClient is Disconnected, schedule a reconnect monitor after {}", _reconnectTimeout); } } finally { _zkClientConnectionMutex.unlock(); diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java index dec8711cf..f93e98919 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/impl/zk/util/ZkMetaClientUtil.java @@ -39,8 +39,8 @@ import org.apache.helix.zookeeper.zkclient.exception.ZkBadVersionException; import org.apache.helix.zookeeper.zkclient.exception.ZkException; import org.apache.helix.zookeeper.zkclient.exception.ZkInterruptedException; import org.apache.helix.zookeeper.zkclient.exception.ZkNoNodeException; -import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.helix.zookeeper.zkclient.exception.ZkNodeExistsException; +import org.apache.helix.zookeeper.zkclient.exception.ZkTimeoutException; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.KeeperException; import org.apache.zookeeper.Op; diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java index 4f609153b..d5bdb735a 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderElectionClient.java @@ -19,10 +19,27 @@ package org.apache.helix.metaclient.recipes.leaderelection; * under the License. */ +import java.util.Arrays; +import java.util.ConcurrentModificationException; +import java.util.HashSet; import java.util.List; +import java.util.Set; +import org.apache.commons.lang3.tuple.ImmutablePair; +import org.apache.helix.metaclient.api.DataChangeListener; import org.apache.helix.metaclient.api.MetaClientInterface; +import org.apache.helix.metaclient.api.Op; +import org.apache.helix.metaclient.api.OpResult; +import org.apache.helix.metaclient.exception.MetaClientException; +import org.apache.helix.metaclient.exception.MetaClientNoNodeException; +import org.apache.helix.metaclient.exception.MetaClientNodeExistsException; import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientConfig; +import org.apache.helix.metaclient.impl.zk.factory.ZkMetaClientFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import static org.apache.helix.metaclient.api.OpResult.Type.*; /** @@ -42,7 +59,17 @@ import org.apache.helix.metaclient.factories.MetaClientConfig; * When the client is used by a leader election service, one client is created for each participant. * */ -public class LeaderElectionClient { +public class LeaderElectionClient implements AutoCloseable { + + private final MetaClientInterface<LeaderInfo> _metaClient; + private final String _participant; + private static final Logger LOG = LoggerFactory.getLogger(LeaderElectionClient.class); + + // A list of leader election group that this client joins. + private Set<String> _leaderGroups = new HashSet<>(); + + private final static String LEADER_ENTRY_KEY = "/LEADER"; + ReElectListener _reElectListener = new ReElectListener(); /** * Construct a LeaderElectionClient using a user passed in leaderElectionConfig. It creates a MetaClient @@ -53,25 +80,37 @@ public class LeaderElectionClient { * @param metaClientConfig The config used to create an metaclient. */ public LeaderElectionClient(MetaClientConfig metaClientConfig, String participant) { - + _participant = participant; + if (metaClientConfig == null) { + throw new IllegalArgumentException("MetaClientConfig cannot be null."); + } + LOG.info("Creating MetaClient for LeaderElectionClient"); + if (MetaClientConfig.StoreType.ZOOKEEPER.equals(metaClientConfig.getStoreType())) { + ZkMetaClientConfig zkMetaClientConfig = new ZkMetaClientConfig.ZkMetaClientConfigBuilder().setConnectionAddress( + metaClientConfig.getConnectionAddress()).setZkSerializer((new LeaderInfoSerializer())).build(); + _metaClient = new ZkMetaClientFactory().getMetaClient(zkMetaClientConfig); + _metaClient.connect(); + } else { + throw new MetaClientException("Unsupported store type: " + metaClientConfig.getStoreType()); + } } /** * Construct a LeaderElectionClient using a user passed in MetaClient object - * When MetaClient is auto closed be cause of being disconnected and auto retry connection timed out, user + * When MetaClient is auto closed because of being disconnected and auto retry connection timed out, user * will need to create a new MetaClient and a new LeaderElectionClient instance. * * @param metaClient metaClient object to be used. */ - public LeaderElectionClient(MetaClientInterface metaClient, String participant) { - + public LeaderElectionClient(MetaClientInterface<LeaderInfo> metaClient, String participant) { + throw new UnsupportedOperationException("Not supported yet."); } /** * Returns true if current participant is the current leadership. */ public boolean isLeader(String leaderPath) { - return false; + return getLeader(leaderPath).equalsIgnoreCase(_participant); } /** @@ -79,10 +118,11 @@ public class LeaderElectionClient { * The Leader Election client maintains and elect an active leader from the participant pool. * * @param leaderPath The path for leader election. - * @return boolean indicating if the operation is succeeded. + * @throws RuntimeException if the operation is not succeeded. */ - public boolean joinLeaderElectionParticipantPool(String leaderPath) { - return false; + public void joinLeaderElectionParticipantPool(String leaderPath) { + // TODO: create participant entry + subscribeAndTryCreateLeaderEntry(leaderPath); } /** @@ -91,10 +131,46 @@ public class LeaderElectionClient { * * @param leaderPath The path for leader election. * @param userInfo Any additional information to associate with this participant. - * @return boolean indicating if the operation is succeeded. + * @throws RuntimeException if the operation is not succeeded. */ - public boolean joinLeaderElectionParticipantPool(String leaderPath, Object userInfo) { - return false; + public void joinLeaderElectionParticipantPool(String leaderPath, LeaderInfo userInfo) { + // TODO: create participant entry with info + subscribeAndTryCreateLeaderEntry(leaderPath); + } + + private void subscribeAndTryCreateLeaderEntry(String leaderPath) { + _metaClient.subscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener, false); + LeaderInfo leaderInfo = new LeaderInfo(LEADER_ENTRY_KEY); + leaderInfo.setLeaderName(_participant); + + try { + // try to create leader entry, assuming leader election group node is already there + _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL); + } catch (MetaClientNodeExistsException ex) { + LOG.info("Already a leader for group {}", leaderPath); + } catch (MetaClientNoNodeException ex) { + try { + // try to create leader path root entry + _metaClient.create(leaderPath, null); + } catch (MetaClientNodeExistsException ignored) { + // root entry created by other client, ignore + } catch (MetaClientNoNodeException e) { + // Parent entry of user provided leader election group path missing. + // (e.g. `/a/b` not created in user specified leader election group path /a/b/c/LeaderGroup) + throw new MetaClientException("Parent entry in leaderGroup path" + leaderPath + " does not exist."); + } + try { + // try to create leader node again. + _metaClient.create(leaderPath + LEADER_ENTRY_KEY, leaderInfo, MetaClientInterface.EntryMode.EPHEMERAL); + } catch (MetaClientNoNodeException e) { + // Leader group root entry is gone after we checked at outer catch block. + // Meaning other client removed the group. Throw ConcurrentModificationException. + throw new ConcurrentModificationException( + "Other client trying to modify the leader election group at the same time, please retry.", ex); + } + } + + _leaderGroups.add(leaderPath + LEADER_ENTRY_KEY); } /** @@ -106,23 +182,64 @@ public class LeaderElectionClient { * Throws exception if the participant is not in the pool. * * @param leaderPath The path for leader election. - * @return boolean indicating if the operation is succeeded. + * @throws RuntimeException if the operation is not succeeded. * - * @throws RuntimeException If the participant did not join participant pool via this client. // TODO: define exp type + * @throws RuntimeException If the participant did not join participant pool via this client. */ - public boolean exitLeaderElectionParticipantPool(String leaderPath) { - return false; + public void exitLeaderElectionParticipantPool(String leaderPath) { + _metaClient.unsubscribeDataChange(leaderPath + LEADER_ENTRY_KEY, _reElectListener); + // TODO: remove from pool folder + relinquishLeaderHelper(leaderPath, true); } /** - * Releases leadership for participant. + * Releases leadership for participant. Still stays in the participant pool. * * @param leaderPath The path for leader election. * * @throws RuntimeException if the leadership is not owned by this participant, or if the - * participant did not join participant pool via this client. // TODO: define exp type + * participant did not join participant pool via this client. */ public void relinquishLeader(String leaderPath) { + relinquishLeaderHelper(leaderPath, false); + } + + /** + * relinquishLeaderHelper and LeaderElectionParticipantPool if configured + * @param leaderPath + * @param exitLeaderElectionParticipantPool + */ + private void relinquishLeaderHelper(String leaderPath, Boolean exitLeaderElectionParticipantPool) { + String key = leaderPath + LEADER_ENTRY_KEY; + // if current client is in the group + if (!_leaderGroups.contains(key)) { + throw new MetaClientException("Participant is not in the leader election group"); + } + // remove leader path from leaderGroups after check if exiting the pool. + // to prevent a race condition in In Zk implementation: + // If there are delays in ZkClient event queue, it is possible the leader election client received leader + // deleted event after unsubscribeDataChange. We will need to remove it from in memory `leaderGroups` map before + // deleting ZNode. So that handler in ReElectListener won't recreate the leader node. + if (exitLeaderElectionParticipantPool) { + _leaderGroups.remove(leaderPath + LEADER_ENTRY_KEY); + } + // check if current participant is the leader + // read data and stats, check, and multi check + delete + ImmutablePair<LeaderInfo, MetaClientInterface.Stat> tup = _metaClient.getDataAndStat(key); + if (tup.left.getLeaderName().equalsIgnoreCase(_participant)) { + int expectedVersion = tup.right.getVersion(); + List<Op> ops = Arrays.asList(Op.check(key, expectedVersion), Op.delete(key, expectedVersion)); + //Execute transactional support on operations + List<OpResult> opResults = _metaClient.transactionOP(ops); + if (opResults.get(0).getType() == ERRORRESULT) { + if (isLeader(leaderPath)) { + // Participant re-elected as leader. + throw new ConcurrentModificationException("Concurrent operation, please retry"); + } else { + LOG.info("Someone else is already leader"); + } + } + } } /** @@ -133,6 +250,12 @@ public class LeaderElectionClient { * @throws RuntimeException when leader path does not exist. // TODO: define exp type */ public String getLeader(String leaderPath) { + LeaderInfo leaderInfo = _metaClient.get(leaderPath + LEADER_ENTRY_KEY); + return leaderInfo == null ? null : leaderInfo.getLeaderName(); + } + + public LeaderInfo getParticipantInfo(String leaderPath) { + // TODO: add getParticipantInfo impl return null; } @@ -158,14 +281,14 @@ public class LeaderElectionClient { * get's auto-deleted after TTL or session timeout) or a new leader comes up, it notifies all * participants who have been listening on entryChange event. * - * An listener will still be installed if the path does not exists yet. + * A listener will still be installed if the path does not exist yet. * * @param leaderPath The path for leader election that listener is interested for change. * @param listener An implementation of LeaderElectionListenerInterface - * @return an boolean value indicating if registration is success. + * @return A boolean value indicating if registration is success. */ - public boolean subscribeLeadershipChanges(String leaderPath, - LeaderElectionListenerInterface listener) { + public boolean subscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { + //TODO: add converter class for LeaderElectionListenerInterface return false; } @@ -173,8 +296,41 @@ public class LeaderElectionClient { * @param leaderPath The path for leader election that listener is no longer interested for change. * @param listener An implementation of LeaderElectionListenerInterface */ - public void unsubscribeLeadershipChanges(String leaderPath, - LeaderElectionListenerInterface listener) { + public void unsubscribeLeadershipChanges(String leaderPath, LeaderElectionListenerInterface listener) { + } + + @Override + public void close() throws Exception { + + // exit all previous joined leader election groups + for (String leaderGroup : _leaderGroups) { + String leaderGroupPathName = + leaderGroup.substring(0, leaderGroup.length() - LEADER_ENTRY_KEY.length() /*remove '/LEADER' */); + exitLeaderElectionParticipantPool(leaderGroupPathName); + } + + // TODO: if last participant, remove folder + _metaClient.disconnect(); + } + + class ReElectListener implements DataChangeListener { + + @Override + public void handleDataChange(String key, Object data, ChangeType changeType) throws Exception { + if (changeType == ChangeType.ENTRY_CREATED) { + LOG.info("new leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key); + } else if (changeType == ChangeType.ENTRY_DELETED) { + if (_leaderGroups.contains(key)) { + LeaderInfo lf = new LeaderInfo("LEADER"); + lf.setLeaderName(_participant); + try { + _metaClient.create(key, lf, MetaClientInterface.EntryMode.EPHEMERAL); + } catch (MetaClientNodeExistsException ex) { + LOG.info("Already a leader {} for leader election group {}.", ((LeaderInfo) data).getLeaderName(), key); + } + } + } + } } } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java index 9e817c1c9..ab0562502 100644 --- a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfo.java @@ -19,27 +19,49 @@ package org.apache.helix.metaclient.recipes.leaderelection; * under the License. */ +import com.fasterxml.jackson.annotation.JsonCreator; +import com.fasterxml.jackson.annotation.JsonIgnore; +import com.fasterxml.jackson.annotation.JsonIgnoreProperties; +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.databind.annotation.JsonSerialize; import org.apache.helix.metaclient.datamodel.DataRecord; /** * This is the data represent leader election info of a leader election path. */ -public class LeaderInfo { +@JsonIgnoreProperties(ignoreUnknown = true) +@JsonSerialize(include = JsonSerialize.Inclusion.NON_NULL) +public class LeaderInfo extends DataRecord { - private String _leaderElectionGroupName; - private final DataRecord _record; + public LeaderInfo(DataRecord dataRecord) { + super(dataRecord); + } + @JsonCreator + public LeaderInfo(@JsonProperty("id") String id) { + super(id); + } - public LeaderInfo( String leaderElectionGroupName) { - _leaderElectionGroupName = leaderElectionGroupName; - _record = new DataRecord(_leaderElectionGroupName); + public LeaderInfo(LeaderInfo info, String id) { + super(info, id); } + public enum LeaderAttribute { LEADER_NAME, PARTICIPANTS } +@JsonIgnore(true) +public String getLeaderName() { + return getSimpleField("LEADER_NAME"); + } + + @JsonIgnore(true) + public void setLeaderName(String id) { + setSimpleField("LEADER_NAME", id); + } + } diff --git a/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java new file mode 100644 index 000000000..fd6d256f6 --- /dev/null +++ b/meta-client/src/main/java/org/apache/helix/metaclient/recipes/leaderelection/LeaderInfoSerializer.java @@ -0,0 +1,42 @@ +package org.apache.helix.metaclient.recipes.leaderelection; + +import com.fasterxml.jackson.databind.DeserializationFeature; +import com.fasterxml.jackson.databind.MapperFeature; +import java.io.ByteArrayInputStream; +import org.apache.helix.zookeeper.datamodel.ZNRecord; +import org.apache.helix.zookeeper.datamodel.serializer.ZNRecordSerializer; +import org.apache.helix.zookeeper.util.GZipCompressionUtil; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + + +public class LeaderInfoSerializer extends ZNRecordSerializer { + + private static final Logger LOG = LoggerFactory.getLogger(LeaderInfoSerializer.class); + + @Override + public Object deserialize(byte[] bytes) { + if (bytes == null || bytes.length == 0) { + // reading a parent/null node + return null; + } + + ByteArrayInputStream bais = new ByteArrayInputStream(bytes); + + mapper.enable(MapperFeature.AUTO_DETECT_FIELDS); + mapper.enable(MapperFeature.AUTO_DETECT_SETTERS); + mapper.enable(DeserializationFeature.FAIL_ON_UNKNOWN_PROPERTIES); + try { + //decompress the data if its already compressed + if (GZipCompressionUtil.isCompressed(bytes)) { + byte[] uncompressedBytes = GZipCompressionUtil.uncompress(bais); + bais = new ByteArrayInputStream(uncompressedBytes); + } + + return mapper.readValue(bais, LeaderInfo.class); + } catch (Exception e) { + LOG.error("Exception during deserialization of bytes: {}", new String(bytes), e); + return null; + } + } +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java new file mode 100644 index 000000000..9b20c8e53 --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/TestUtil.java @@ -0,0 +1,22 @@ +package org.apache.helix.metaclient; + +public class TestUtil { + public static final long WAIT_DURATION = 6 * 1000L; + public interface Verifier { + boolean verify() + throws Exception; + } + + public static boolean verify(Verifier verifier, long timeout) + throws Exception { + long start = System.currentTimeMillis(); + do { + boolean result = verifier.verify(); + if (result || (System.currentTimeMillis() - start) > timeout) { + return result; + } + Thread.sleep(50); + } while (true); + } + +} diff --git a/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java new file mode 100644 index 000000000..3a45d97ce --- /dev/null +++ b/meta-client/src/test/java/org/apache/helix/metaclient/recipes/leaderelection/TestLeaderElection.java @@ -0,0 +1,64 @@ +package org.apache.helix.metaclient.recipes.leaderelection; + +import org.apache.helix.metaclient.TestUtil; +import org.apache.helix.metaclient.factories.MetaClientConfig; +import org.apache.helix.metaclient.impl.zk.ZkMetaClientTestBase; +import org.testng.Assert; +import org.testng.annotations.Test; + + +public class TestLeaderElection extends ZkMetaClientTestBase { + + private static final String PARTICIPANT_NAME1 = "participant_1"; + private static final String PARTICIPANT_NAME2 = "participant_2"; + private static final String LEADER_PATH = "/LEADER_ELECTION_GROUP_1"; + + public LeaderElectionClient createLeaderElectionClient(String participantName) { + MetaClientConfig.StoreType storeType = MetaClientConfig.StoreType.ZOOKEEPER; + MetaClientConfig config = new MetaClientConfig.MetaClientConfigBuilder<>().setConnectionAddress(ZK_ADDR) + .setStoreType(storeType).build(); + return new LeaderElectionClient(config, participantName); + } + + @Test + public void testAcquireLeadership() throws Exception { + // create 2 clients representing 2 participants + LeaderElectionClient clt1 = createLeaderElectionClient(PARTICIPANT_NAME1); + LeaderElectionClient clt2 = createLeaderElectionClient(PARTICIPANT_NAME2); + + clt1.joinLeaderElectionParticipantPool(LEADER_PATH); + clt2.joinLeaderElectionParticipantPool(LEADER_PATH); + // First client joining the leader election group should be current leader + Assert.assertTrue(TestUtil.verify(() -> { + return (clt1.getLeader(LEADER_PATH) != null); + }, TestUtil.WAIT_DURATION)); + Assert.assertNotNull(clt1.getLeader(LEADER_PATH)); + Assert.assertEquals(clt1.getLeader(LEADER_PATH), clt2.getLeader(LEADER_PATH)); + Assert.assertEquals(clt1.getLeader(LEADER_PATH), PARTICIPANT_NAME1); + + // client 1 exit leader election group, and client 2 should be current leader. + clt1.exitLeaderElectionParticipantPool(LEADER_PATH); + Assert.assertTrue(TestUtil.verify(() -> { + return (clt1.getLeader(LEADER_PATH) != null); + }, TestUtil.WAIT_DURATION)); + Assert.assertTrue(TestUtil.verify(() -> { + return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME2)); + }, TestUtil.WAIT_DURATION)); + + // client1 join and client2 leave. client 1 should be leader. + clt1.joinLeaderElectionParticipantPool(LEADER_PATH); + clt2.exitLeaderElectionParticipantPool(LEADER_PATH); + Assert.assertTrue(TestUtil.verify(() -> { + return (clt1.getLeader(LEADER_PATH) != null); + }, TestUtil.WAIT_DURATION)); + Assert.assertTrue(TestUtil.verify(() -> { + return (clt1.getLeader(LEADER_PATH).equals(PARTICIPANT_NAME1)); + }, TestUtil.WAIT_DURATION)); + Assert.assertTrue(clt1.isLeader(LEADER_PATH)); + Assert.assertFalse(clt2.isLeader(LEADER_PATH)); + + clt1.close(); + clt2.close(); + } + +} diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java index 5838cfad7..0cc3215bb 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/datamodel/serializer/ZNRecordSerializer.java @@ -40,7 +40,7 @@ import org.slf4j.LoggerFactory; public class ZNRecordSerializer implements ZkSerializer { private static final Logger LOG = LoggerFactory.getLogger(ZNRecordSerializer.class); - private static ObjectMapper mapper = new ObjectMapper() + protected static ObjectMapper mapper = new ObjectMapper() // TODO: remove it after upgrading ZNRecord's annotations to Jackson 2 .setAnnotationIntrospector(new CodehausJacksonIntrospector()); diff --git a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java index 376409231..589425462 100644 --- a/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java +++ b/zookeeper-api/src/main/java/org/apache/helix/zookeeper/zkclient/ZkConnection.java @@ -260,6 +260,7 @@ public class ZkConnection implements IZkConnection { private void lookupGetChildrenMethod() { _getChildrenMethod = doLookUpGetChildrenMethod(); + System.out.println(" ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED " + GETCHILDREN_PAGINATION_DISABLED); LOG.info("Pagination config {}={}, method to be invoked: {}", ZkSystemPropertyKeys.ZK_GETCHILDREN_PAGINATION_DISABLED, GETCHILDREN_PAGINATION_DISABLED, _getChildrenMethod.getName());