Updated Branches: refs/heads/helix-0.6.2-release 06ca975d1 -> 4e881378c
[HELIX-380] Incompatibility issue with HELIX_PROPERTYSTORE, rb=18053 Project: http://git-wip-us.apache.org/repos/asf/helix/repo Commit: http://git-wip-us.apache.org/repos/asf/helix/commit/4e881378 Tree: http://git-wip-us.apache.org/repos/asf/helix/tree/4e881378 Diff: http://git-wip-us.apache.org/repos/asf/helix/diff/4e881378 Branch: refs/heads/helix-0.6.2-release Commit: 4e881378cd262505efa6ea6378cfcb0a88d2902f Parents: 06ca975 Author: zzhang <[email protected]> Authored: Thu Feb 13 16:19:46 2014 -0800 Committer: zzhang <[email protected]> Committed: Thu Feb 13 16:19:46 2014 -0800 ---------------------------------------------------------------------- .../apache/helix/manager/zk/ZKHelixManager.java | 7 +- .../helix/manager/zk/ZkBaseDataAccessor.java | 3 + .../manager/zk/ZkCacheBaseDataAccessor.java | 5 +- .../store/zk/AutoFallbackPropertyStore.java | 332 ++++++++++ .../store/zk/TestAutoFallbackPropertyStore.java | 625 +++++++++++++++++++ .../zk/TestZkManagerWithAutoFallbackStore.java | 115 ++++ 6 files changed, 1081 insertions(+), 6 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java index c54c901..a5ca409 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZKHelixManager.java @@ -70,6 +70,7 @@ import org.apache.helix.model.builder.ConfigScopeBuilder; import org.apache.helix.monitoring.ZKPathDataDumpTask; import org.apache.helix.participant.HelixStateMachineEngine; import org.apache.helix.participant.StateMachineEngine; +import org.apache.helix.store.zk.AutoFallbackPropertyStore; import org.apache.helix.store.zk.ZkHelixPropertyStore; import org.apache.log4j.Logger; import org.apache.zookeeper.Watcher.Event.KeeperState; @@ -624,10 +625,10 @@ public class ZKHelixManager implements HelixManager, IZkStateListener { if (_helixPropertyStore == null) { String path = PropertyPathConfig.getPath(PropertyType.PROPERTYSTORE, _clusterName); - + String fallbackPath = String.format("/%s/%s", _clusterName, "HELIX_PROPERTYSTORE"); _helixPropertyStore = - new ZkHelixPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path, - null); + new AutoFallbackPropertyStore<ZNRecord>(new ZkBaseDataAccessor<ZNRecord>(_zkclient), path, + fallbackPath); } return _helixPropertyStore; http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java index 2ba76a3..231ce77 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkBaseDataAccessor.java @@ -82,6 +82,9 @@ public class ZkBaseDataAccessor<T> implements BaseDataAccessor<T> { private final ZkClient _zkClient; public ZkBaseDataAccessor(ZkClient zkClient) { + if (zkClient == null) { + throw new NullPointerException("zkclient is null"); + } _zkClient = zkClient; } http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java index 5e7355b..1206d20 100644 --- a/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java +++ b/helix-core/src/main/java/org/apache/helix/manager/zk/ZkCacheBaseDataAccessor.java @@ -516,11 +516,10 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { @Override public boolean[] exists(List<String> paths, int options) { final int size = paths.size(); - List<String> serverPaths = prependChroot(paths); boolean exists[] = new boolean[size]; for (int i = 0; i < size; i++) { - exists[i] = exists(serverPaths.get(i), options); + exists[i] = exists(paths.get(i), options); } return exists; } @@ -664,7 +663,7 @@ public class ZkCacheBaseDataAccessor<T> implements HelixPropertyStore<T> { List<String> paths = new ArrayList<String>(); for (String childName : childNames) { - String path = parentPath + "/" + childName; + String path = parentPath.equals("/") ? "/" + childName : parentPath + "/" + childName; paths.add(path); } http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java new file mode 100644 index 0000000..02fa5bc --- /dev/null +++ b/helix-core/src/main/java/org/apache/helix/store/zk/AutoFallbackPropertyStore.java @@ -0,0 +1,332 @@ +package org.apache.helix.store.zk; + +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.AccessOption; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.log4j.Logger; +import org.apache.zookeeper.data.Stat; + +/** + * Property store that does auto fallback to an old location. + * Assuming no concurrent updates + */ +public class AutoFallbackPropertyStore<T> extends ZkHelixPropertyStore<T> { + private static Logger LOG = Logger.getLogger(AutoFallbackPropertyStore.class); + + private final ZkHelixPropertyStore<T> _fallbackStore; + + public AutoFallbackPropertyStore(ZkBaseDataAccessor<T> accessor, String root, String fallbackRoot) { + super(accessor, root, null); + + if (accessor.exists(fallbackRoot, 0)) { + _fallbackStore = new ZkHelixPropertyStore<T>(accessor, fallbackRoot, null); + } else { + LOG.info("fallbackRoot: " + fallbackRoot + + " doesn't exist, skip creating fallback property store"); + _fallbackStore = null; + } + + } + + @Override + public boolean update(String path, DataUpdater<T> updater, int options) { + if (_fallbackStore == null) { + return super.update(path, updater, options); + } else { + Stat stat = super.getStat(path, options); + if (stat == null) { + // create znode at new location with fallback-value + T fallbackValue = _fallbackStore.get(path, null, options); + boolean succeed = super.create(path, fallbackValue, AccessOption.PERSISTENT); + if (!succeed) { + LOG.error("Can't update " + path + " since there are concurrent updates"); + return false; + } + } + return super.update(path, updater, options); + } + } + + @Override + public boolean exists(String path, int options) { + if (_fallbackStore == null) { + return super.exists(path, options); + } else { + boolean exist = super.exists(path, options); + if (!exist) { + exist = _fallbackStore.exists(path, options); + } + return exist; + } + } + + @Override + public boolean remove(String path, int options) { + if (_fallbackStore != null) { + _fallbackStore.remove(path, options); + } + return super.remove(path, options); + } + + @Override + public T get(String path, Stat stat, int options) { + if (_fallbackStore == null) { + return super.get(path, stat, options); + } else { + T value = super.get(path, stat, options); + if (value == null) { + value = _fallbackStore.get(path, stat, options); + } + + return value; + } + } + + @Override + public Stat getStat(String path, int options) { + if (_fallbackStore == null) { + return super.getStat(path, options); + } else { + Stat stat = super.getStat(path, options); + + if (stat == null) { + stat = _fallbackStore.getStat(path, options); + } + return stat; + } + } + + @Override + public boolean[] updateChildren(List<String> paths, List<DataUpdater<T>> updaters, int options) { + if (_fallbackStore == null) { + return super.updateChildren(paths, updaters, options); + } else { + Stat[] stats = super.getStats(paths, options); + Map<String, Integer> fallbackMap = new HashMap<String, Integer>(); + Map<String, Integer> updateMap = new HashMap<String, Integer>(); + for (int i = 0; i < paths.size(); i++) { + String path = paths.get(i); + if (stats[i] == null) { + fallbackMap.put(path, i); + } else { + updateMap.put(path, i); + } + } + + if (fallbackMap.size() > 0) { + List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); + List<T> fallbackValues = _fallbackStore.get(fallbackPaths, null, options); + boolean createSucceed[] = + super.createChildren(fallbackPaths, fallbackValues, AccessOption.PERSISTENT); + + for (int i = 0; i < fallbackPaths.size(); i++) { + String fallbackPath = fallbackPaths.get(i); + if (createSucceed[i]) { + updateMap.put(fallbackPath, fallbackMap.get(fallbackPath)); + } else { + LOG.error("Can't update " + fallbackPath + " since there are concurrent updates"); + } + } + } + + boolean succeed[] = new boolean[paths.size()]; // all init'ed to false + if (updateMap.size() > 0) { + List<String> updatePaths = new ArrayList<String>(updateMap.keySet()); + List<DataUpdater<T>> subUpdaters = new ArrayList<DataUpdater<T>>(); + for (int i = 0; i < updatePaths.size(); i++) { + String updatePath = updatePaths.get(i); + subUpdaters.add(updaters.get(updateMap.get(updatePath))); + } + + boolean updateSucceed[] = super.updateChildren(updatePaths, subUpdaters, options); + for (int i = 0; i < updatePaths.size(); i++) { + String updatePath = updatePaths.get(i); + if (updateSucceed[i]) { + succeed[updateMap.get(updatePath)] = true; + } + } + } + + return succeed; + } + } + + @Override + public boolean[] exists(List<String> paths, int options) { + if (_fallbackStore == null) { + return super.exists(paths, options); + } else { + boolean[] exists = super.exists(paths, options); + + Map<String, Integer> fallbackMap = new HashMap<String, Integer>(); + for (int i = 0; i < paths.size(); i++) { + boolean exist = exists[i]; + if (!exist) { + fallbackMap.put(paths.get(i), i); + } + } + + if (fallbackMap.size() > 0) { + List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); + + boolean[] fallbackExists = _fallbackStore.exists(fallbackPaths, options); + for (int i = 0; i < fallbackPaths.size(); i++) { + String fallbackPath = fallbackPaths.get(i); + int j = fallbackMap.get(fallbackPath); + exists[j] = fallbackExists[i]; + } + } + + return exists; + } + } + + @Override + public boolean[] remove(List<String> paths, int options) { + if (_fallbackStore != null) { + _fallbackStore.remove(paths, options); + } + return super.remove(paths, options); + } + + @Override + public List<T> get(List<String> paths, List<Stat> stats, int options) { + if (_fallbackStore == null) { + return super.get(paths, stats, options); + } else { + List<T> values = super.get(paths, stats, options); + + Map<String, Integer> fallbackMap = new HashMap<String, Integer>(); + for (int i = 0; i < paths.size(); i++) { + T value = values.get(i); + if (value == null) { + fallbackMap.put(paths.get(i), i); + } + } + + if (fallbackMap.size() > 0) { + List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); + List<Stat> fallbackStats = new ArrayList<Stat>(); + List<T> fallbackValues = _fallbackStore.get(fallbackPaths, fallbackStats, options); + for (int i = 0; i < fallbackPaths.size(); i++) { + String fallbackPath = fallbackPaths.get(i); + int j = fallbackMap.get(fallbackPath); + values.set(j, fallbackValues.get(i)); + if (stats != null) { + stats.set(j, fallbackStats.get(i)); + } + } + } + + return values; + } + } + + @Override + public Stat[] getStats(List<String> paths, int options) { + if (_fallbackStore == null) { + return super.getStats(paths, options); + } else { + Stat[] stats = super.getStats(paths, options); + + Map<String, Integer> fallbackMap = new HashMap<String, Integer>(); + for (int i = 0; i < paths.size(); i++) { + Stat stat = stats[i]; + if (stat == null) { + fallbackMap.put(paths.get(i), i); + } + } + + if (fallbackMap.size() > 0) { + List<String> fallbackPaths = new ArrayList<String>(fallbackMap.keySet()); + + Stat[] fallbackStats = _fallbackStore.getStats(fallbackPaths, options); + for (int i = 0; i < fallbackPaths.size(); i++) { + String fallbackPath = fallbackPaths.get(i); + int j = fallbackMap.get(fallbackPath); + stats[j] = fallbackStats[i]; + } + } + + return stats; + } + } + + @Override + public List<String> getChildNames(String parentPath, int options) { + if (_fallbackStore == null) { + return super.getChildNames(parentPath, options); + } else { + List<String> childs = super.getChildNames(parentPath, options); + List<String> fallbackChilds = _fallbackStore.getChildNames(parentPath, options); + + if (childs == null && fallbackChilds == null) { + return null; + } + + // merge two child lists + Set<String> allChildSet = new HashSet<String>(); + if (childs != null) { + allChildSet.addAll(childs); + } + + if (fallbackChilds != null) { + allChildSet.addAll(fallbackChilds); + } + + List<String> allChilds = new ArrayList<String>(allChildSet); + return allChilds; + } + } + + @Override + public void start() { + if (_fallbackStore != null) { + _fallbackStore.start(); + } + } + + @Override + public void stop() { + if (_fallbackStore != null) { + _fallbackStore.stop(); + } + + super.stop(); + } + + @Override + public void reset() { + if (_fallbackStore != null) { + _fallbackStore.reset(); + } + + super.reset(); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java new file mode 100644 index 0000000..d4cb658 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestAutoFallbackPropertyStore.java @@ -0,0 +1,625 @@ +package org.apache.helix.store.zk; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.Date; +import java.util.List; + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.AccessOption; +import org.apache.helix.PropertyType; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.apache.zookeeper.data.Stat; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestAutoFallbackPropertyStore extends ZkUnitTestBase { + + class MyDataUpdater implements DataUpdater<ZNRecord> { + final String _id; + + public MyDataUpdater(String id) { + _id = id; + } + + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData == null) { + currentData = new ZNRecord(_id); + } else { + currentData.setSimpleField("key", "value"); + } + return currentData; + } + } + + @Test + public void testSingleUpdateUsingFallbackPath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0 under fallbackRoot + for (int i = 0; i < 1; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + String path = String.format("/%d", 0); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + + boolean succeed = store.update(path, new MyDataUpdater("new0"), AccessOption.PERSISTENT); + Assert.assertTrue(succeed); + + // fallback path should remain unchanged + ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "0"); + Assert.assertNull(record.getSimpleField("key")); + + // new path should have simple field set + record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "0"); + Assert.assertNotNull(record.getSimpleField("key")); + Assert.assertEquals(record.getSimpleField("key"), "value"); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSingleUpdateUsingNewPath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0 under both fallbackRoot and root + for (int i = 0; i < 1; i++) { + String path = String.format("%s/%d", root, i); + baseAccessor.create(path, new ZNRecord("new" + i), AccessOption.PERSISTENT); + + path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + String path = String.format("/%d", 0); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + + boolean succeed = store.update(path, new MyDataUpdater("0"), AccessOption.PERSISTENT); + Assert.assertTrue(succeed); + + ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "0"); + Assert.assertNull(record.getSimpleField("key")); + + record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "new0"); + Assert.assertNotNull(record.getSimpleField("key")); + Assert.assertEquals(record.getSimpleField("key"), "value"); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testMultiUpdateUsingFallbackPath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under fallbackRoot + for (int i = 0; i < 10; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>(); + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + paths.add(path); + updaters.add(new MyDataUpdater("new" + i)); + } + + boolean succeed[] = store.updateChildren(paths, updaters, AccessOption.PERSISTENT); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(succeed[i]); + String path = paths.get(i); + + // fallback path should remain unchanged + ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNull(record.getSimpleField("key")); + + // new path should have simple field set + record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNotNull(record.getSimpleField("key")); + Assert.assertEquals(record.getSimpleField("key"), "value"); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testMultiUpdateUsingNewath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under both fallbackRoot and new root + for (int i = 0; i < 10; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + + path = String.format("%s/%d", root, i); + baseAccessor.create(path, new ZNRecord("new" + i), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + List<DataUpdater<ZNRecord>> updaters = new ArrayList<DataUpdater<ZNRecord>>(); + for (int i = 0; i < 20; i++) { + String path = String.format("/%d", i); + if (i < 10) { + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + } else { + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should not exist under fallback location"); + } + paths.add(path); + updaters.add(new MyDataUpdater("new" + i)); + } + + boolean succeed[] = store.updateChildren(paths, updaters, AccessOption.PERSISTENT); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(succeed[i]); + String path = paths.get(i); + + // fallback path should remain unchanged + if (i < 10) { + ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNull(record.getSimpleField("key")); + } else { + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should not exist under fallback location"); + } + + // new path should have simple field set + ZNRecord record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "new" + i); + if (i < 10) { + Assert.assertNotNull(record.getSimpleField("key")); + Assert.assertEquals(record.getSimpleField("key"), "value"); + } else { + Assert.assertNull(record.getSimpleField("key")); + } + + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSingleSet() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0 under fallbackRoot + for (int i = 0; i < 1; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + String path = String.format("/%d", 0); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + ZNRecord record = new ZNRecord("new0"); + boolean succeed = store.set(path, record, AccessOption.PERSISTENT); + Assert.assertTrue(succeed); + + record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "0"); + + record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "new0"); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testMultiSet() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under fallbackRoot + for (int i = 0; i < 10; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + List<ZNRecord> records = new ArrayList<ZNRecord>(); + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + paths.add(path); + ZNRecord record = new ZNRecord("new" + i); + records.add(record); + } + + boolean succeed[] = store.setChildren(paths, records, AccessOption.PERSISTENT); + for (int i = 0; i < 10; i++) { + Assert.assertTrue(succeed[i]); + String path = String.format("/%d", i); + ZNRecord record = baseAccessor.get(String.format("%s%s", fallbackRoot, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), Integer.toString(i)); + + record = baseAccessor.get(String.format("%s%s", root, path), null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "new" + i); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testSingleGetOnFallbackPath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0 under fallbackRoot + for (int i = 0; i < 1; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + String path = String.format("/%d", 0); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + + // test single exist + boolean exist = store.exists(path, 0); + Assert.assertTrue(exist); + + // test single getStat + Stat stat = store.getStat(path, 0); + Assert.assertNotNull(stat); + + // test single get + ZNRecord record = store.get(path, null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "0"); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location after get"); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + + } + + @Test + void testMultiGetOnFallbackPath() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under fallbackRoot + for (int i = 0; i < 10; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + paths.add(path); + } + + // test multi-exist + boolean exists[] = store.exists(paths, 0); + for (int i = 0; i < paths.size(); i++) { + Assert.assertTrue(exists[i]); + } + + // test multi-getStat + Stat stats[] = store.getStats(paths, 0); + for (int i = 0; i < paths.size(); i++) { + Assert.assertNotNull(stats[i]); + } + + // test multi-get + List<ZNRecord> records = store.get(paths, null, 0); + Assert.assertNotNull(records); + Assert.assertEquals(records.size(), 10); + for (int i = 0; i < 10; i++) { + ZNRecord record = records.get(i); + String path = paths.get(i); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), Integer.toString(i)); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location after get"); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testFailOnSingleGet() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + String path = String.format("/%d", 0); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should not exist under fallback location"); + + // test single exist + boolean exist = store.exists(path, 0); + Assert.assertFalse(exist); + + // test single getStat + Stat stat = store.getStat(path, 0); + Assert.assertNull(stat); + + // test single get + ZNRecord record = store.get(path, null, 0); + Assert.assertNull(record); + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testFailOnMultiGet() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under fallbackRoot + for (int i = 0; i < 10; i++) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + for (int i = 0; i < 20; i++) { + String path = String.format("/%d", i); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + if (i < 10) { + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + } else { + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should not exist under fallback location"); + } + paths.add(path); + } + + // test multi-exist + boolean exists[] = store.exists(paths, 0); + for (int i = 0; i < paths.size(); i++) { + if (i < 10) { + Assert.assertTrue(exists[i]); + } else { + Assert.assertFalse(exists[i]); + } + } + + // test multi-getStat + Stat stats[] = store.getStats(paths, 0); + for (int i = 0; i < paths.size(); i++) { + if (i < 10) { + Assert.assertNotNull(stats[i]); + } else { + Assert.assertNull(stats[i]); + } + } + + // test multi-get + List<ZNRecord> records = store.get(paths, null, 0); + Assert.assertNotNull(records); + Assert.assertEquals(records.size(), 20); + for (int i = 0; i < 20; i++) { + ZNRecord record = records.get(i); + String path = paths.get(i); + if (i < 10) { + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), Integer.toString(i)); + } else { + Assert.assertNull(record); + } + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location after get"); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + + @Test + public void testGetChildren() { + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + String root = String.format("/%s/%s", clusterName, PropertyType.PROPERTYSTORE.name()); + String fallbackRoot = String.format("/%s/%s", clusterName, "HELIX_PROPERTYSTORE"); + ZkBaseDataAccessor<ZNRecord> baseAccessor = new ZkBaseDataAccessor<ZNRecord>(_gZkClient); + + // create 0-9 under fallbackRoot and 10-19 under root + for (int i = 0; i < 20; i++) { + + if (i < 10) { + String path = String.format("%s/%d", fallbackRoot, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } else { + String path = String.format("%s/%d", root, i); + baseAccessor.create(path, new ZNRecord(Integer.toString(i)), AccessOption.PERSISTENT); + } + } + + AutoFallbackPropertyStore<ZNRecord> store = + new AutoFallbackPropertyStore<ZNRecord>(baseAccessor, root, fallbackRoot); + + List<String> paths = new ArrayList<String>(); + for (int i = 0; i < 20; i++) { + String path = String.format("/%d", i); + if (i < 10) { + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should exist under fallback location"); + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should not exist under new location"); + + } else { + Assert.assertFalse(baseAccessor.exists(String.format("%s%s", fallbackRoot, path), 0), + "Should not exist under fallback location"); + Assert.assertTrue(baseAccessor.exists(String.format("%s%s", root, path), 0), + "Should exist under new location"); + + } + paths.add(path); + } + + List<String> childs = store.getChildNames("/", 0); + Assert.assertNotNull(childs); + Assert.assertEquals(childs.size(), 20); + for (int i = 0; i < 20; i++) { + String child = childs.get(i); + Assert.assertTrue(childs.contains(child)); + } + + List<ZNRecord> records = store.getChildren("/", null, 0); + Assert.assertNotNull(records); + Assert.assertEquals(records.size(), 20); + for (int i = 0; i < 20; i++) { + ZNRecord record = records.get(i); + Assert.assertNotNull(record); + String id = record.getId(); + Assert.assertTrue(childs.contains(id)); + } + + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } +} http://git-wip-us.apache.org/repos/asf/helix/blob/4e881378/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java ---------------------------------------------------------------------- diff --git a/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java new file mode 100644 index 0000000..549ce70 --- /dev/null +++ b/helix-core/src/test/java/org/apache/helix/store/zk/TestZkManagerWithAutoFallbackStore.java @@ -0,0 +1,115 @@ +package org.apache.helix.store.zk; + +import java.util.Date; + +import org.I0Itec.zkclient.DataUpdater; +import org.apache.helix.AccessOption; +import org.apache.helix.BaseDataAccessor; +import org.apache.helix.HelixDataAccessor; +import org.apache.helix.TestHelper; +import org.apache.helix.ZNRecord; +import org.apache.helix.ZkUnitTestBase; +import org.apache.helix.integration.manager.MockParticipantManager; +import org.apache.helix.manager.zk.ZkBaseDataAccessor; +import org.testng.Assert; +import org.testng.annotations.Test; + +public class TestZkManagerWithAutoFallbackStore extends ZkUnitTestBase { + @Test + public void testBasic() throws Exception { + // Logger.getRootLogger().setLevel(Level.INFO); + String className = TestHelper.getTestClassName(); + String methodName = TestHelper.getTestMethodName(); + String clusterName = className + "_" + methodName; + int n = 2; + + System.out.println("START " + clusterName + " at " + new Date(System.currentTimeMillis())); + + TestHelper.setupCluster(clusterName, ZK_ADDR, 12918, // participant port + "localhost", // participant name prefix + "TestDB", // resource name prefix + 1, // resources + 32, // partitions per resource + n, // number of nodes + 2, // replicas + "MasterSlave", false); // do rebalance + + // start participants + MockParticipantManager[] participants = new MockParticipantManager[n]; + for (int i = 0; i < 1; i++) { + String instanceName = "localhost_" + (12918 + i); + + participants[i] = new MockParticipantManager(ZK_ADDR, clusterName, instanceName); + participants[i].syncStart(); + } + + // add some data to fallback path: HELIX_PROPERTYSTORE + BaseDataAccessor<ZNRecord> accessor = + participants[0].getHelixDataAccessor().getBaseDataAccessor(); + for (int i = 0; i < 10; i++) { + String path = String.format("/%s/HELIX_PROPERTYSTORE/%d", clusterName, i); + ZNRecord record = new ZNRecord("" + i); + record.setSimpleField("key1", "value1"); + accessor.set(path, record, AccessOption.PERSISTENT); + } + + ZkHelixPropertyStore<ZNRecord> store = participants[0].getHelixPropertyStore(); + + // read shall use fallback paths + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + ZNRecord record = store.get(path, null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNotNull(record.getSimpleField("key1")); + Assert.assertEquals(record.getSimpleField("key1"), "value1"); + } + + // update shall update new paths + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + store.update(path, new DataUpdater<ZNRecord>() { + + @Override + public ZNRecord update(ZNRecord currentData) { + if (currentData != null) { + currentData.setSimpleField("key2", "value2"); + } + return currentData; + } + }, AccessOption.PERSISTENT); + } + + for (int i = 0; i < 10; i++) { + String path = String.format("/%d", i); + ZNRecord record = store.get(path, null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNotNull(record.getSimpleField("key1")); + Assert.assertEquals(record.getSimpleField("key1"), "value1"); + Assert.assertNotNull(record.getSimpleField("key2")); + Assert.assertEquals(record.getSimpleField("key2"), "value2"); + } + + // set shall use new path + for (int i = 10; i < 20; i++) { + String path = String.format("/%d", i); + ZNRecord record = new ZNRecord("" + i); + record.setSimpleField("key3", "value3"); + store.set(path, record, AccessOption.PERSISTENT); + } + + for (int i = 10; i < 20; i++) { + String path = String.format("/%d", i); + ZNRecord record = store.get(path, null, 0); + Assert.assertNotNull(record); + Assert.assertEquals(record.getId(), "" + i); + Assert.assertNotNull(record.getSimpleField("key3")); + Assert.assertEquals(record.getSimpleField("key3"), "value3"); + } + + participants[0].syncStop(); + System.out.println("END " + clusterName + " at " + new Date(System.currentTimeMillis())); + } + +}
