refactoring and simplification. No need for ids and versions in Migrations/MetaData. A hash can be auto-generated.
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/c8df9a41 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/c8df9a41 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/c8df9a41 Branch: refs/heads/master Commit: c8df9a414b9a035f45946460ff7e1adff7fd65d4 Parents: 4f12abc Author: randgalt <[email protected]> Authored: Fri Jul 14 12:00:27 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 12:00:27 2017 -0500 ---------------------------------------------------------------------- .../imps/CuratorMultiTransactionRecord.java | 11 +++ .../framework/imps/ExtractingCuratorOp.java | 8 +- .../x/async/modeled/migrations/MetaData.java | 74 +----------------- .../x/async/modeled/migrations/Migration.java | 49 ++---------- .../modeled/migrations/MigrationManager.java | 81 +++++++++++++------- .../src/site/confluence/index.confluence | 1 + .../migrations/TestMigrationManager.java | 14 ++-- 7 files changed, 87 insertions(+), 151 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java index 0611df6..3e72609 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/CuratorMultiTransactionRecord.java @@ -23,6 +23,7 @@ import org.apache.curator.framework.api.transaction.OperationType; import org.apache.curator.framework.api.transaction.TypeAndPath; import org.apache.zookeeper.MultiTransactionRecord; import org.apache.zookeeper.Op; +import java.security.MessageDigest; import java.util.List; class CuratorMultiTransactionRecord extends MultiTransactionRecord @@ -50,4 +51,14 @@ class CuratorMultiTransactionRecord extends MultiTransactionRecord { return metadata.size(); } + + void addToDigest(MessageDigest digest) + { + for ( Op op : this ) + { + digest.update(op.getPath().getBytes()); + digest.update(Integer.toString(op.getType()).getBytes()); + digest.update(op.toRequestRecord().toString().getBytes()); + } + } } http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java ---------------------------------------------------------------------- diff --git a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java index 7a5db69..58a1572 100644 --- a/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java +++ b/curator-framework/src/main/java/org/apache/curator/framework/imps/ExtractingCuratorOp.java @@ -22,8 +22,9 @@ import com.google.common.base.Preconditions; import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.framework.api.transaction.TypeAndPath; import org.apache.zookeeper.Op; +import java.security.MessageDigest; -class ExtractingCuratorOp implements CuratorOp +public class ExtractingCuratorOp implements CuratorOp { private final CuratorMultiTransactionRecord record = new CuratorMultiTransactionRecord(); @@ -46,6 +47,11 @@ class ExtractingCuratorOp implements CuratorOp return record.iterator().next(); } + public void addToDigest(MessageDigest digest) + { + + } + private void validate() { Preconditions.checkArgument(record.size() > 0, "No operation has been added"); http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java index da40a5b..8377967 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java @@ -18,76 +18,8 @@ */ package org.apache.curator.x.async.modeled.migrations; -import java.util.Objects; - -/** - * The meta data of a single migration - */ -public class MetaData +@FunctionalInterface +public interface MetaData { - private final String migrationId; - private final int migrationVersion; - - public MetaData() - { - this("", 0); - } - - public MetaData(String migrationId, int migrationVersion) - { - this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null"); - this.migrationVersion = migrationVersion; - } - - /** - * @return The ID of the migration that was applied - */ - public String getMigrationId() - { - return migrationId; - } - - /** - * @return the version of the migration that was applied - */ - public int getMigrationVersion() - { - return migrationVersion; - } - - @Override - public boolean equals(Object o) - { - if ( this == o ) - { - return true; - } - if ( o == null || getClass() != o.getClass() ) - { - return false; - } - - MetaData metaData = (MetaData)o; - - //noinspection SimplifiableIfStatement - if ( migrationVersion != metaData.migrationVersion ) - { - return false; - } - return migrationId.equals(metaData.migrationId); - } - - @Override - public int hashCode() - { - int result = migrationId.hashCode(); - result = 31 * result + migrationVersion; - return result; - } - - @Override - public String toString() - { - return "MetaData{" + "migrationId='" + migrationId + '\'' + ", migrationVersion=" + migrationVersion + '}'; - } + byte[] operationHash(); } http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java index b456580..972c59e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java @@ -20,57 +20,18 @@ package org.apache.curator.x.async.modeled.migrations; import org.apache.curator.framework.api.transaction.CuratorOp; import java.util.List; -import java.util.Objects; -import java.util.function.Supplier; /** * Models a single migration/transition */ +@FunctionalInterface public interface Migration { /** - * @return the unique ID for this migration - */ - String id(); - - /** - * @return the version of this migration - */ - int version(); - - /** - * @return the operations to execute in a transaction + * Return the operations to execute in a transaction. IMPORTANT: during a migration + * this method may be called multiple times. + * + * @return operations */ List<CuratorOp> operations(); - - static Migration build(String id, Supplier<List<CuratorOp>> operationsProc) - { - return build(id, 1, operationsProc); - } - - static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc) - { - Objects.requireNonNull(id, "id cannot be null"); - Objects.requireNonNull(operationsProc, "operationsProc cannot be null"); - return new Migration() - { - @Override - public String id() - { - return id; - } - - @Override - public int version() - { - return version; - } - - @Override - public List<CuratorOp> operations() - { - return operationsProc.get(); - } - }; - } } http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java index 01de2f8..d6d37de 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java @@ -20,6 +20,7 @@ package org.apache.curator.x.async.modeled.migrations; import com.google.common.base.Throwables; import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.imps.ExtractingCuratorOp; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -29,8 +30,11 @@ import org.apache.curator.x.async.modeled.ModeledFramework; import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; import org.apache.zookeeper.CreateMode; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; import java.time.Duration; import java.util.ArrayList; +import java.util.Arrays; import java.util.Comparator; import java.util.List; import java.util.Objects; @@ -49,27 +53,21 @@ public class MigrationManager { private final AsyncCuratorFramework client; private final ZPath lockPath; - private final ModelSerializer<MetaData> metaDataSerializer; private final Executor executor; private final Duration lockMax; private static final String META_DATA_NODE_NAME = "meta-"; /** - * Jackson usage: See the note in {@link org.apache.curator.x.async.modeled.JacksonModelSerializer} regarding how the Jackson library is specified in Curator's Maven file. - * Unless you are not using Jackson pass <code>JacksonModelSerializer.build(MetaData.class)</code> for <code>metaDataSerializer</code> - * * @param client the curator client * @param lockPath base path for locks used by the manager - * @param metaDataSerializer JacksonModelSerializer.build(MetaData.class) * @param executor the executor to use * @param lockMax max time to wait for locks */ - public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax) + public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax) { this.client = Objects.requireNonNull(client, "client cannot be null"); this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); - this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null"); this.executor = Objects.requireNonNull(executor, "executor cannot be null"); this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); } @@ -90,18 +88,6 @@ public class MigrationManager } /** - * Utility to return the meta data from previous migrations - * - * @param set the set - * @return stage - */ - public CompletionStage<List<MetaData>> metaData(MigrationSet set) - { - ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath()); - return ZNode.models(modeled.childrenAsZNodes()); - } - - /** * Can be overridden to change how the comparison to previous migrations is done. The default * version ensures that the meta data from previous migrations matches the current migration * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown. @@ -115,21 +101,46 @@ public class MigrationManager { if ( sortedMetaData.size() > set.migrations().size() ) { - throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData)); + throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id())); } int compareSize = Math.min(set.migrations().size(), sortedMetaData.size()); - List<MetaData> compareMigrations = set.migrations().subList(0, compareSize) - .stream() - .map(m -> new MetaData(m.id(), m.version())) - .collect(Collectors.toList()); - if ( !compareMigrations.equals(sortedMetaData) ) + List<Migration> subList = set.migrations().subList(0, compareSize); + for ( int i = 0; i < compareSize; ++i ) { - throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MigrationSet: %s - MetaData: %s", set.id(), set.migrations(), sortedMetaData)); + byte[] setHash = hash(set.migrations().get(i).operations()).operationHash(); + if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) ) + { + throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id())); + } } return set.migrations().subList(sortedMetaData.size(), set.migrations().size()); } + private MetaData hash(List<CuratorOp> operations) + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("SHA-256"); + } + catch ( NoSuchAlgorithmException e ) + { + throw new RuntimeException(e); + } + operations.forEach(op -> { + if ( op instanceof ExtractingCuratorOp ) + { + ((ExtractingCuratorOp)op).addToDigest(digest); + } + else + { + digest.update(op.toString().getBytes()); + } + }); + return digest::digest; + } + private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) { ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath()); @@ -148,7 +159,21 @@ public class MigrationManager private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath) { - ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build(); + ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>() + { + @Override + public byte[] serialize(MetaData model) + { + return model.operationHash(); + } + + @Override + public MetaData deserialize(byte[] bytes) + { + return () -> bytes; + } + }; + ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build(); return ModeledFramework.wrap(client, modelSpec); } @@ -186,7 +211,7 @@ public class MigrationManager List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> { List<CuratorOp> operations = new ArrayList<>(); operations.addAll(migration.operations()); - MetaData thisMetaData = new MetaData(migration.id(), migration.version()); + MetaData thisMetaData = hash(operations); operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData)); return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture(); }).collect(Collectors.toList()); http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/site/confluence/index.confluence ---------------------------------------------------------------------- diff --git a/curator-x-async/src/site/confluence/index.confluence b/curator-x-async/src/site/confluence/index.confluence index 4d81d44..74a47b4 100644 --- a/curator-x-async/src/site/confluence/index.confluence +++ b/curator-x-async/src/site/confluence/index.confluence @@ -40,6 +40,7 @@ This is a strongly typed DSL that allows you to map a Curator\-style client to: * Options for how nodes should be created (sequential, compressed data, ttl, etc.) * ACLs for the nodes at the path * Options for how to delete nodes (guaranteed, deleting children, etc.) +* Perform ZooKeeper data migration For example: http://git-wip-us.apache.org/repos/asf/curator/blob/c8df9a41/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java index 3fe5de2..45aa130 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java @@ -109,7 +109,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); executor = Executors.newCachedThreadPool(); - manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10)); + manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10)); } @AfterMethod @@ -124,9 +124,9 @@ public class TestMigrationManager extends CompletableBaseClassForTests @Test public void testBasic() throws Exception { - Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB)); - Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op)); - Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op)); + Migration m1 = () -> Arrays.asList(v1opA, v1opB); + Migration m2 = () -> Collections.singletonList(v2op); + Migration m3 = () -> Collections.singletonList(v3op); MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3)); complete(manager.migrate(migrationSet)); @@ -142,14 +142,14 @@ public class TestMigrationManager extends CompletableBaseClassForTests @Test public void testStaged() throws Exception { - Migration m1 = Migration.build("1",1, () -> Arrays.asList(v1opA, v1opB)); + Migration m1 = () -> Arrays.asList(v1opA, v1opB); MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1)); complete(manager.migrate(migrationSet)); ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec); complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test")); - Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op)); + Migration m2 = () -> Collections.singletonList(v2op); migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2)); complete(manager.migrate(migrationSet)); @@ -159,7 +159,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests Assert.assertEquals(m.getAge(), 10); }); - Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op)); + Migration m3 = () -> Collections.singletonList(v3op); migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3)); complete(manager.migrate(migrationSet));
