Replace HelixDataAccessor.createProperty with more specific methods
Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/9ebf3e9c Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/9ebf3e9c Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/9ebf3e9c Branch: refs/heads/helix-0.6.x Commit: 9ebf3e9c93cf59a0244553ba3a84df140de8699c Parents: 1b3c5c2 Author: Junkai Xue <j...@linkedin.com> Authored: Sat Jan 28 17:20:55 2017 -0800 Committer: Junkai Xue <j...@linkedin.com> Committed: Wed Feb 1 20:19:43 2017 -0800 ---------------------------------------------------------------------- .../org/apache/helix/HelixDataAccessor.java | 18 +++---- .../org/apache/helix/PropertyPathBuilder.java | 4 ++ .../manager/zk/DistributedLeaderElection.java | 3 +- .../apache/helix/manager/zk/ZKHelixAdmin.java | 4 +- .../helix/manager/zk/ZKHelixDataAccessor.java | 56 ++++++++++++++------ .../apache/helix/manager/zk/ZKHelixManager.java | 19 +------ .../src/test/java/org/apache/helix/Mocks.java | 30 ++++++++--- .../helix/integration/TestSchedulerMessage.java | 4 +- .../integration/TestSchedulerMsgUsingQueue.java | 4 +- 9 files changed, 87 insertions(+), 55 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java index ad2c4bc..0f48539 100644 --- a/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/HelixDataAccessor.java @@ -23,6 +23,11 @@ import java.util.List; import java.util.Map; import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.PauseSignal; +import org.apache.helix.model.StateModelDefinition; + /** * Interface used to interact with Helix Data Types like IdealState, Config, @@ -30,15 +35,10 @@ import org.I0Itec.zkclient.DataUpdater; * type. See {@link PropertyKey.Builder} to get more information on building a propertyKey. */ public interface HelixDataAccessor { - /** - * Create a helix property only if it does not exist. - * @param key - * @param value - * @return true if creation was successful. False if already exists or if it - * failed to create - */ - - <T extends HelixProperty> boolean createProperty(PropertyKey key, T value); + boolean createStateModelDef(StateModelDefinition stateModelDef); + boolean createControllerMessage(Message message); + boolean createControllerLeader(LiveInstance leader); + boolean createPause(PauseSignal pauseSignal); /** * Set a property, overwrite if it exists and creates if not exists. This api http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java index 13ddb08..30fc0a9 100644 --- a/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java +++ b/helix-core/src/main/java/org/apache/helix/PropertyPathBuilder.java @@ -295,6 +295,10 @@ public class PropertyPathBuilder { return String.format("/%s/CONTROLLER/MESSAGES", clusterName); } + public static String controllerMessage(String clusterName, String messageId) { + return String.format("/%s/CONTROLLER/MESSAGES/%s", clusterName, messageId); + } + public static String controllerStatusUpdate(String clusterName) { return String.format("/%s/CONTROLLER/STATUSUPDATES", clusterName); } http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java index d281ae4..51fdc42 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/DistributedLeaderElection.java @@ -35,6 +35,7 @@ import org.apache.helix.model.LeaderHistory; import org.apache.helix.model.LiveInstance; import org.apache.log4j.Logger; + /** * do distributed leader election */ @@ -121,7 +122,7 @@ public class DistributedLeaderElection implements ControllerChangeListener { leader.setLiveInstance(ManagementFactory.getRuntimeMXBean().getName()); leader.setSessionId(manager.getSessionId()); leader.setHelixVersion(manager.getVersion()); - boolean success = accessor.createProperty(keyBuilder.controllerLeader(), leader); + boolean success = accessor.createControllerLeader(leader); if (success) { return true; } else { http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java index 1eaa3c6..b7cb6d4 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixAdmin.java @@ -58,7 +58,6 @@ import org.apache.helix.model.ConstraintItem; import org.apache.helix.model.CurrentState; import org.apache.helix.model.ExternalView; import org.apache.helix.model.HelixConfigScope; -import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; import org.apache.helix.model.IdealState; import org.apache.helix.model.IdealState.RebalanceMode; import org.apache.helix.model.InstanceConfig; @@ -73,6 +72,7 @@ import org.apache.helix.tools.DefaultIdealStateCalculator; import org.apache.helix.util.RebalanceUtil; import org.apache.log4j.Logger; + public class ZKHelixAdmin implements HelixAdmin { public static final String CONNECTION_TIMEOUT = "helixAdmin.timeOutInSec"; private final ZkClient _zkClient; @@ -329,7 +329,7 @@ public class ZKHelixAdmin implements HelixAdmin { if (enabled) { accessor.removeProperty(keyBuilder.pause()); } else { - accessor.createProperty(keyBuilder.pause(), new PauseSignal("pause")); + accessor.createPause(new PauseSignal("pause")); } } http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java index ed434a1..791c6d8 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixDataAccessor.java @@ -36,14 +36,20 @@ import org.apache.helix.HelixProperty; import org.apache.helix.InstanceType; import org.apache.helix.PropertyKey; import org.apache.helix.PropertyKey.Builder; +import org.apache.helix.PropertyPathBuilder; import org.apache.helix.PropertyType; import org.apache.helix.ZNRecord; import org.apache.helix.ZNRecordAssembler; import org.apache.helix.ZNRecordBucketizer; import org.apache.helix.ZNRecordUpdater; +import org.apache.helix.model.LiveInstance; +import org.apache.helix.model.Message; +import org.apache.helix.model.PauseSignal; +import org.apache.helix.model.StateModelDefinition; import org.apache.log4j.Logger; import org.apache.zookeeper.data.Stat; + public class ZKHelixDataAccessor implements HelixDataAccessor { private static Logger LOG = Logger.getLogger(ZKHelixDataAccessor.class); private final BaseDataAccessor<ZNRecord> _baseDataAccessor; @@ -65,23 +71,43 @@ public class ZKHelixDataAccessor implements HelixDataAccessor { _propertyKeyBuilder = new PropertyKey.Builder(_clusterName); } - @Override - public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value) { - PropertyType type = key.getType(); - String path = key.getPath(); - int options = constructOptions(type); - boolean success = false; - switch (type) { - case STATEMODELDEFS: - if (value.isValid()) { - success = _baseDataAccessor.create(path, value.getRecord(), options); + @Override public boolean createStateModelDef(StateModelDefinition stateModelDef) { + String path = PropertyPathBuilder.stateModelDef(_clusterName, stateModelDef.getId()); + HelixProperty property = + getProperty(new PropertyKey.Builder(_clusterName).stateModelDef(stateModelDef.getId())); + + // Set new StateModelDefinition if it is different from old one. + if (property != null) { + // StateModelDefinition need to be updated + if (!new StateModelDefinition(property.getRecord()).equals(stateModelDef)) { + return stateModelDef.isValid() && _baseDataAccessor + .set(path, stateModelDef.getRecord(), AccessOption.PERSISTENT); } - break; - default: - success = _baseDataAccessor.create(path, value.getRecord(), options); - break; + } else { + // StateModeDefinition does not exist + return stateModelDef.isValid() && _baseDataAccessor + .create(path, stateModelDef.getRecord(), AccessOption.PERSISTENT); } - return success; + // StateModelDefinition exists but not need to be updated + return true; + } + + @Override + public boolean createControllerMessage(Message message) { + return _baseDataAccessor.create(PropertyPathBuilder.controllerMessage(_clusterName, message.getMsgId()), + message.getRecord(), AccessOption.PERSISTENT); + } + + @Override + public boolean createControllerLeader(LiveInstance leader) { + return _baseDataAccessor.create(PropertyPathBuilder.controllerLeader(_clusterName), leader.getRecord(), + AccessOption.EPHEMERAL); + } + + @Override + public boolean createPause(PauseSignal pauseSignal) { + return _baseDataAccessor.create( + PropertyPathBuilder.pause(_clusterName), pauseSignal.getRecord(), AccessOption.PERSISTENT); } @Override http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index 25eb653..bef2eac 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -462,24 +462,7 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { */ private void addBuiltInStateModelDefinitions() { for (BuiltInStateModelDefinitions def : BuiltInStateModelDefinitions.values()) { - String path = String - .format("/%s/STATEMODELDEFS/%s", _clusterName, def.getStateModelDefinition().getId()); - - HelixProperty property = _dataAccessor.getProperty(new PropertyKey.Builder(_clusterName) - .stateModelDef(def.getStateModelDefinition().getId())); - - // Set new StateModelDefinition if it is different from old one. - if (property != null) { - // StateModelDefinition need to be updated - if (!new StateModelDefinition(property.getRecord()).equals(def.getStateModelDefinition())) { - _baseDataAccessor - .set(path, def.getStateModelDefinition().getRecord(), AccessOption.PERSISTENT); - } - } else { - // StateModeDefinition does not exist - _baseDataAccessor - .create(path, def.getStateModelDefinition().getRecord(), AccessOption.PERSISTENT); - } + _dataAccessor.createStateModelDef(def.getStateModelDefinition()); } } http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/test/java/org/apache/helix/Mocks.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/Mocks.java b/helix-core/src/test/java/org/apache/helix/Mocks.java index 0fb7d9e..7d27693 100644 --- a/helix-core/src/test/java/org/apache/helix/Mocks.java +++ b/helix-core/src/test/java/org/apache/helix/Mocks.java @@ -38,7 +38,10 @@ import org.apache.helix.messaging.handling.HelixTaskResult; import org.apache.helix.messaging.handling.MessageHandlerFactory; import org.apache.helix.messaging.handling.MessageTask; import org.apache.helix.model.HelixConfigScope.ConfigScopeProperty; +import org.apache.helix.model.LiveInstance; import org.apache.helix.model.Message; +import org.apache.helix.model.PauseSignal; +import org.apache.helix.model.StateModelDefinition; import org.apache.helix.participant.StateMachineEngine; import org.apache.helix.participant.statemachine.StateModel; import org.apache.helix.participant.statemachine.StateModelInfo; @@ -46,6 +49,7 @@ import org.apache.helix.participant.statemachine.Transition; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.zookeeper.data.Stat; + public class Mocks { public static class MockBaseDataAccessor implements BaseDataAccessor<ZNRecord> { Map<String, ZNRecord> map = new HashMap<String, ZNRecord>(); @@ -471,6 +475,26 @@ public class Mocks { Map<String, ZNRecord> map = new HashMap<String, ZNRecord>(); @Override + public boolean createStateModelDef(StateModelDefinition stateModelDef) { + return false; + } + + @Override + public boolean createControllerMessage(Message message) { + return false; + } + + @Override + public boolean createControllerLeader(LiveInstance leader) { + return false; + } + + @Override + public boolean createPause(PauseSignal pauseSignal) { + return false; + } + + @Override public boolean setProperty(PropertyKey key, HelixProperty value) { String path = key.getPath(); data.put(path, value.getRecord()); @@ -570,12 +594,6 @@ public class Mocks { } @Override - public <T extends HelixProperty> boolean createProperty(PropertyKey key, T value) { - // TODO Auto-generated method stub - return false; - } - - @Override public <T extends HelixProperty> boolean[] createChildren(List<PropertyKey> keys, List<T> children) { // TODO Auto-generated method stub http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java index 17c5d62..8c239f5 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMessage.java @@ -58,6 +58,7 @@ import org.codehaus.jackson.map.SerializationConfig; import org.testng.Assert; import org.testng.annotations.Test; + public class TestSchedulerMessage extends ZkStandAloneCMTestBase { public static class MockAsyncCallback extends AsyncCallback { @@ -232,8 +233,7 @@ public class TestSchedulerMessage extends ZkStandAloneCMTestBase { HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); Builder keyBuilder = helixDataAccessor.keyBuilder(); - helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()), - schedulerMessage); + helixDataAccessor.createControllerMessage(schedulerMessage); for (int i = 0; i < 30; i++) { Thread.sleep(2000); http://git-wip-us.apache.org/repos/asf/helix/blob/9ebf3e9c/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java index d5b5680..e7deefa 100644 --- a/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java +++ b/helix-core/src/test/java/org/apache/helix/integration/TestSchedulerMsgUsingQueue.java @@ -38,6 +38,7 @@ import org.codehaus.jackson.map.SerializationConfig; import org.testng.Assert; import org.testng.annotations.Test; + public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBase { TestSchedulerMessage.TestMessagingHandlerFactory _factory = new TestSchedulerMessage.TestMessagingHandlerFactory(); @@ -91,8 +92,7 @@ public class TestSchedulerMsgUsingQueue extends ZkStandAloneCMTestBase { HelixDataAccessor helixDataAccessor = manager.getHelixDataAccessor(); PropertyKey.Builder keyBuilder = helixDataAccessor.keyBuilder(); - helixDataAccessor.createProperty(keyBuilder.controllerMessage(schedulerMessage.getMsgId()), - schedulerMessage); + helixDataAccessor.createControllerMessage(schedulerMessage); for (int i = 0; i < 30; i++) { Thread.sleep(2000);