some more refactoring, tests and doc
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c40a3836 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c40a3836 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c40a3836 Branch: refs/heads/master Commit: c40a3836131ad49279e05a5c3dd6656848c1eb60 Parents: d80651a Author: randgalt <[email protected]> Authored: Fri Jul 14 13:32:05 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 13:32:05 2017 -0500 ---------------------------------------------------------------------- .../x/async/migrations/MigrationManager.java | 20 ++-- .../x/async/migrations/MigrationSet.java | 14 +-- .../src/site/confluence/index.confluence | 9 +- .../src/site/confluence/migrations.confluence | 97 ++++++++++++++++++++ curator-x-async/src/site/site.xml | 2 +- .../async/migrations/TestMigrationManager.java | 36 +++++++- 6 files changed, 150 insertions(+), 28 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java index cb3d6ff..676eef6 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java @@ -50,6 +50,7 @@ public class MigrationManager { private final AsyncCuratorFramework client; private final String lockPath; + private final String metaDataPath; private final Executor executor; private final Duration lockMax; @@ -58,13 +59,15 @@ public class MigrationManager /** * @param client the curator client * @param lockPath base path for locks used by the manager + * @param metaDataPath base path to store the meta data * @param executor the executor to use * @param lockMax max time to wait for locks */ - public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax) + public MigrationManager(AsyncCuratorFramework client, String lockPath, String metaDataPath, Executor executor, Duration lockMax) { this.client = Objects.requireNonNull(client, "client cannot be null"); this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); + this.metaDataPath = Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null"); this.executor = Objects.requireNonNull(executor, "executor cannot be null"); this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); } @@ -139,8 +142,9 @@ public class MigrationManager private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) { - return childrenWithData(client, set.metaDataPath()) - .thenCompose(metaData -> applyMetaData(set, metaData)) + String thisMetaDataPath = ZKPaths.makePath(metaDataPath, set.id()); + return childrenWithData(client, thisMetaDataPath) + .thenCompose(metaData -> applyMetaData(set, metaData, thisMetaDataPath)) .handle((v, e) -> { release(lock, true); if ( e != null ) @@ -152,7 +156,7 @@ public class MigrationManager ); } - private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData) + private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData, String thisMetaDataPath) { List<byte[]> sortedMetaData = metaData.keySet() .stream() @@ -177,13 +181,13 @@ public class MigrationManager return CompletableFuture.completedFuture(null); } - return asyncEnsureContainers(client, set.metaDataPath()) - .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied)); + return asyncEnsureContainers(client, thisMetaDataPath) + .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, thisMetaDataPath)); } - private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied) + private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, String thisMetaDataPath) { - String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME); + String metaDataBasePath = ZKPaths.makePath(thisMetaDataPath, META_DATA_NODE_NAME); List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> { List<CuratorOp> operations = new ArrayList<>(); operations.addAll(migration.operations()); http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java index 089d3d8..94b5205 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java @@ -34,19 +34,13 @@ public interface MigrationSet String id(); /** - * @return where to store the meta data for this migration set - */ - String metaDataPath(); - - /** * @return list of migrations in the order that they should be applied */ List<Migration> migrations(); - static MigrationSet build(String id, String metaDataPath, List<Migration> migrations) + static MigrationSet build(String id, List<Migration> migrations) { Objects.requireNonNull(id, "id cannot be null"); - Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null"); final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations); return new MigrationSet() { @@ -57,12 +51,6 @@ public interface MigrationSet } @Override - public String metaDataPath() - { - return metaDataPath; - } - - @Override public List<Migration> migrations() { return migrationsCopy; http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence index 74a47b4..64f3786 100644 --- a/curator-x-async/src/site/confluence/index.confluence +++ b/curator-x-async/src/site/confluence/index.confluence @@ -40,7 +40,6 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to: * Options for how nodes should be created (sequential, compressed data, ttl, etc.) * ACLs for the nodes at the path * Options for how to delete nodes (guaranteed, deleting children, etc.) -* Perform ZooKeeper data migration For example: @@ -50,3 +49,11 @@ modeled.set(new Foo()); {code} See [[Modeled Curator|modeled.html]] for details. + +h2. [[Migrations|migrations.html]] + +Curator Migrations allow you pre\-apply transactions in a staged manner so that you +can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner +similar to database migration utilities. + +See [[Migrations|migrations.html]] for details. http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/confluence/migrations.confluence ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/confluence/migrations.confluence b/curator-x-async/src/site/confluence/migrations.confluence new file mode 100644 index 0000000..4775fac --- /dev/null +++ b/curator-x-async/src/site/confluence/migrations.confluence @@ -0,0 +1,97 @@ +h1. Migrations + +Curator Migrations allow you pre\-apply transactions in a staged manner so that you +can ensure a consistent state for parts of your ZooKeeper node hierarchy in a manner +similar to database migration utilities. + +h2. Background and Usage + +Note: To use Migrations, you should be familiar with Java 8's lambdas, CompletedFuture and CompletionStage. + +A "migration" is a set of operations to be performed in a transaction. A "migration set" is a list +of migrations. Combined, this can be used to ensure an initial state for your ZooKeeper nodes as +well as supporting upgrading/modifying existing state. + +For example, given a brand new ZooKeeper instance you might want to populate a few nodes +and data. E.g. + +{code} +CuratorOp op1 = client.transactionOp().create().forPath("/parent"); +CuratorOp op2 = client.transactionOp().create().forPath("/parent/one"); +CuratorOp op3 = client.transactionOp().create().forPath("/parent/two"); +CuratorOp op4 = client.transactionOp().create().forPath("/parent/three"); +CuratorOp op5 = client.transactionOp().create().forPath("/main", someData); +{code} + +All 5 of these operations would be combined into a migration and set: + +{code} +Migration migration = () -> Arrays.asList(op1, op2, op3, op4, op5); +MigrationSet set = MigrationSet.build("main", Collections.singletonList(migration)); +{code} + +This set can then be passed to a {{MigrationManager}} for processing. The MigrationManager +checks to see if the migration has been applied already and, if not, processes the transaction. + +At a future date, the migration set could be expanded to update/modify things. E.g. + +{code} +CuratorOp newOp1 = client.transactionOp().create().forPath("/new"); +CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed +{code} + +This would be combined with the previous migration: + +{code} +Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5); +Migration newMigration = () -> Arrays.asList(newOp1, newOp2); +MigrationSet set = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration)); +{code} + +When this set is run, the MigrationManager will perform both migration operations on new +ZooKeeper databases but only the second "newMigration" on ZK databases that already have +the first migration applied. + +h2. Details/Reference + +_Migration_ + +A Migration is a wrapper around a list of operations that constitute one stage in a migration +set and are applied as a single transaction. + +_MigrationSet_ + +A MigrationSet is an ordered list of Migrations. Curator keeps track of which migrations in a +set have been previously applied and only processes un\-applied migrations. Each migration +set must have a unique identifier. Create a MigrationSet via its builder: + +{code} +MigrationSet set = MigrationSet.build(migrationId, migrations); +{code} + +_MigrationManager_ + +The MigrationManager processes MigrationSets. Usually, you'd run this only on new ZooKeeper +databases or as part of a maintenance operation to update the ZooKeeper database. E.g. + +{code} +MigrationManager manager = new MigrationManager(client, + lockPath, // base path for locks used by the manager + metaDataPath, // base path to store the meta data + executor, // the executor to use + lockMax // max time to wait for locks +); +manager.migrate(set).exceptionally(e -> { + if ( e instanceof MigrationException ) { + // migration checksum failed, etc. + } else { + // some other kind of error + } + return null; +}); +{code} + +Each migration in the set is applied in a transaction. MigrationManager stores a hash +of all operations in a migration so that it can be compared for future operations. i.e. +if, in the future, a migration set is attempted but the hash of one of the previous migrations +does not match, the stage completes exceptionally with {{MigrationException}}. http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/site/site.xml ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/site.xml b/curator-x-async/src/site/site.xml index f78abc7..fc0a67a 100644 --- a/curator-x-async/src/site/site.xml +++ b/curator-x-async/src/site/site.xml @@ -25,7 +25,7 @@ <link rel="stylesheet" href="../css/site.css" /> <script type="text/javascript"> $(function(){ - if ( location && location.pathname && location.pathname.endsWith('/index.html') ) { + if ( location && location.pathname && (location.pathname.endsWith('/index.html') || location.pathname.endsWith('/migrations.html')) ) { $('a[title="Java 8/Async"]').parent().addClass("active"); } else { $('a[title="Strongly Typed Models"]').parent().addClass("active"); http://git-wip-us.apache.org/repos/asf/curator/blob/c40a3836/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java index 42bc76d..637aca1 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java @@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); executor = Executors.newCachedThreadPool(); - manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10)); + manager = new MigrationManager(client, "/migrations/locks", "/migrations/metadata", executor, Duration.ofMinutes(10)); } @AfterMethod @@ -127,7 +127,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests Migration m1 = () -> Arrays.asList(v1opA, v1opB); Migration m2 = () -> Collections.singletonList(v2op); Migration m3 = () -> Collections.singletonList(v3op); - MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3)); + MigrationSet migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3)); complete(manager.migrate(migrationSet)); @@ -143,14 +143,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests public void testStaged() throws Exception { Migration m1 = () -> Arrays.asList(v1opA, v1opB); - MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1)); + MigrationSet migrationSet = MigrationSet.build("1", Collections.singletonList(m1)); complete(manager.migrate(migrationSet)); ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec); complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test")); Migration m2 = () -> Collections.singletonList(v2op); - migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2)); + migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2)); complete(manager.migrate(migrationSet)); ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec); @@ -160,7 +160,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests }); Migration m3 = () -> Collections.singletonList(v3op); - migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3)); + migrationSet = MigrationSet.build("1", Arrays.asList(m1, m2, m3)); complete(manager.migrate(migrationSet)); ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); @@ -170,4 +170,30 @@ public class TestMigrationManager extends CompletableBaseClassForTests Assert.assertEquals(m.getLastName(), "Two"); }); } + + @Test + public void testDocExample() throws Exception + { + CuratorOp op1 = client.transactionOp().create().forPath("/parent"); + CuratorOp op2 = client.transactionOp().create().forPath("/parent/one"); + CuratorOp op3 = client.transactionOp().create().forPath("/parent/two"); + CuratorOp op4 = client.transactionOp().create().forPath("/parent/three"); + CuratorOp op5 = client.transactionOp().create().forPath("/main", "hey".getBytes()); + + Migration initialMigration = () -> Arrays.asList(op1, op2, op3, op4, op5); + MigrationSet migrationSet = MigrationSet.build("main", Collections.singletonList(initialMigration)); + complete(manager.migrate(migrationSet)); + + Assert.assertNotNull(client.unwrap().checkExists().forPath("/parent/three")); + Assert.assertEquals(client.unwrap().getData().forPath("/main"), "hey".getBytes()); + + CuratorOp newOp1 = client.transactionOp().create().forPath("/new"); + CuratorOp newOp2 = client.transactionOp().delete().forPath("/main"); // maybe this is no longer needed + + Migration newMigration = () -> Arrays.asList(newOp1, newOp2); + migrationSet = MigrationSet.build("main", Arrays.asList(initialMigration, newMigration)); + complete(manager.migrate(migrationSet)); + + Assert.assertNull(client.unwrap().checkExists().forPath("/main")); + } }
