major refactoring and simplification. No longer dependent on any modeled code 
so it's moved to the parent async package


Project: http://git-wip-us.apache.org/repos/asf/curator/repo
Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d80651a7
Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d80651a7
Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d80651a7

Branch: refs/heads/master
Commit: d80651a7a276b0ce999601c30e205aceaae94d4c
Parents: 2ab172a
Author: randgalt <[email protected]>
Authored: Fri Jul 14 12:32:37 2017 -0500
Committer: randgalt <[email protected]>
Committed: Fri Jul 14 12:32:37 2017 -0500

----------------------------------------------------------------------
 .../apache/curator/x/async/AsyncWrappers.java   |  96 +++++++-
 .../curator/x/async/migrations/Migration.java   |  37 ++++
 .../x/async/migrations/MigrationException.java  |  37 ++++
 .../x/async/migrations/MigrationManager.java    | 195 ++++++++++++++++
 .../x/async/migrations/MigrationSet.java        |  72 ++++++
 .../x/async/modeled/migrations/MetaData.java    |  25 ---
 .../x/async/modeled/migrations/Migration.java   |  37 ----
 .../modeled/migrations/MigrationException.java  |  37 ----
 .../modeled/migrations/MigrationManager.java    | 220 -------------------
 .../async/modeled/migrations/MigrationSet.java  |  73 ------
 .../async/migrations/TestMigrationManager.java  | 173 +++++++++++++++
 .../x/async/migrations/models/ModelV1.java      |  39 ++++
 .../x/async/migrations/models/ModelV2.java      |  46 ++++
 .../x/async/migrations/models/ModelV3.java      |  53 +++++
 .../migrations/TestMigrationManager.java        | 173 ---------------
 .../modeled/migrations/models/ModelV1.java      |  39 ----
 .../modeled/migrations/models/ModelV2.java      |  46 ----
 .../modeled/migrations/models/ModelV3.java      |  53 -----
 18 files changed, 747 insertions(+), 704 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
index 9630985..d7b3cc3 100644
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java
@@ -18,12 +18,16 @@
  */
 package org.apache.curator.x.async;
 
+import com.google.common.base.Throwables;
+import com.google.common.collect.Maps;
 import org.apache.curator.framework.recipes.locks.InterProcessLock;
 import org.apache.curator.utils.ThreadUtils;
 import org.apache.curator.utils.ZKPaths;
 import org.apache.curator.x.async.api.ExistsOption;
-import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.zookeeper.KeeperException;
 import java.util.Collections;
+import java.util.List;
+import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.CompletableFuture;
 import java.util.concurrent.CompletionStage;
@@ -70,6 +74,66 @@ import java.util.concurrent.TimeUnit;
 public class AsyncWrappers
 {
     /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the 
data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call 
returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link 
org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an 
empty map.
+     * </p>
+     *
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> 
childrenWithData(AsyncCuratorFramework client, String path)
+    {
+        return childrenWithData(client, path, false);
+    }
+
+    /**
+     * <p>
+     * Return the children of the given path (keyed by the full path) and the 
data for each node.
+     * IMPORTANT: this results in a ZooKeeper query
+     * for each child node returned. i.e. if the initial children() call 
returns
+     * 10 nodes an additional 10 ZooKeeper queries are made to get the data.
+     * </p>
+     *
+     * <p>
+     * Note: if the any of the nodes in the path do not exist yet, {@link 
org.apache.zookeeper.KeeperException.NoNodeException}
+     * is <strong>NOT</strong> set. Instead the stage is completed with an 
empty map.
+     * </p>
+     *
+     * @param isCompressed pass true if data is compressed
+     * @return CompletionStage
+     */
+    public static CompletionStage<Map<String, byte[]>> 
childrenWithData(AsyncCuratorFramework client, String path, boolean 
isCompressed)
+    {
+        CompletableFuture<Map<String, byte[]>> future = new 
CompletableFuture<>();
+        client.getChildren().forPath(path).handle((children, e) -> {
+            if ( e != null )
+            {
+                if ( Throwables.getRootCause(e) instanceof 
KeeperException.NoNodeException )
+                {
+                    future.complete(Maps.newHashMap());
+                }
+                else
+                {
+                    future.completeExceptionally(e);
+                }
+            }
+            else
+            {
+                completeChildren(client, future, path, children, isCompressed);
+            }
+            return null;
+        });
+        return future;
+    }
+
+    /**
      * Asynchronously call {@link 
org.apache.curator.framework.CuratorFramework#createContainers(String)} using
      * the given executor
      *
@@ -279,6 +343,36 @@ public class AsyncWrappers
         }
     }
 
+    private static void completeChildren(AsyncCuratorFramework client, 
CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> 
children, boolean isCompressed)
+    {
+        Map<String, byte[]> nodes = Maps.newHashMap();
+        if ( children.size() == 0 )
+        {
+            future.complete(nodes);
+            return;
+        }
+
+        children.forEach(node -> {
+            String path = ZKPaths.makePath(parentPath, node);
+            AsyncStage<byte[]> stage = isCompressed ? 
client.getData().decompressed().forPath(path) : client.getData().forPath(path);
+            stage.handle((data, e) -> {
+                if ( e != null )
+                {
+                    future.completeExceptionally(e);
+                }
+                else
+                {
+                    nodes.put(path, data);
+                    if ( nodes.size() == children.size() )
+                    {
+                        future.complete(nodes);
+                    }
+                }
+                return null;
+            });
+        });
+    }
+
     private AsyncWrappers()
     {
     }

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
new file mode 100644
index 0000000..daac435
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import java.util.List;
+
+/**
+ * Models a single migration/transition
+ */
+@FunctionalInterface
+public interface Migration
+{
+    /**
+     * Return the operations to execute in a transaction. IMPORTANT: during a 
migration
+     * this method may be called multiple times.
+     *
+     * @return operations
+     */
+    List<CuratorOp> operations();
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
new file mode 100644
index 0000000..c4971ce
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import java.util.Objects;
+
+public class MigrationException extends RuntimeException
+{
+    private final String migrationId;
+
+    public MigrationException(String migrationId, String message)
+    {
+        super(message);
+        this.migrationId = Objects.requireNonNull(migrationId, "migrationId 
cannot be null");
+    }
+
+    public String getMigrationId()
+    {
+        return migrationId;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
new file mode 100644
index 0000000..cb3d6ff
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java
@@ -0,0 +1,195 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.base.Throwables;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.framework.imps.ExtractingCuratorOp;
+import org.apache.curator.framework.recipes.locks.InterProcessLock;
+import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
+import org.apache.curator.utils.ZKPaths;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.zookeeper.CreateMode;
+import java.security.MessageDigest;
+import java.security.NoSuchAlgorithmException;
+import java.time.Duration;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Comparator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+import java.util.concurrent.Executor;
+import java.util.concurrent.TimeUnit;
+import java.util.stream.Collectors;
+
+import static org.apache.curator.x.async.AsyncWrappers.*;
+
+/**
+ * Manages migrations
+ */
+public class MigrationManager
+{
+    private final AsyncCuratorFramework client;
+    private final String lockPath;
+    private final Executor executor;
+    private final Duration lockMax;
+
+    private static final String META_DATA_NODE_NAME = "meta-";
+
+    /**
+     * @param client the curator client
+     * @param lockPath base path for locks used by the manager
+     * @param executor the executor to use
+     * @param lockMax max time to wait for locks
+     */
+    public MigrationManager(AsyncCuratorFramework client, String lockPath, 
Executor executor, Duration lockMax)
+    {
+        this.client = Objects.requireNonNull(client, "client cannot be null");
+        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be 
null");
+        this.executor = Objects.requireNonNull(executor, "executor cannot be 
null");
+        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be 
null");
+    }
+
+    /**
+     * Process the given migration set
+     *
+     * @param set the set
+     * @return completion stage. If there is a migration-specific error, the 
stage will be completed
+     * exceptionally with {@link 
org.apache.curator.x.async.migrations.MigrationException}.
+     */
+    public CompletionStage<Void> migrate(MigrationSet set)
+    {
+        InterProcessLock lock = new 
InterProcessSemaphoreMutex(client.unwrap(), ZKPaths.makePath(lockPath, 
set.id()));
+        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), 
TimeUnit.MILLISECONDS, executor);
+        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
+    }
+
+    /**
+     * Can be overridden to change how the comparison to previous migrations 
is done. The default
+     * version ensures that the meta data from previous migrations matches the 
current migration
+     * set exactly (by order and version). If there is a mismatch, 
<code>MigrationException</code> is thrown.
+     *
+     * @param set the migration set being applied
+     * @param operationHashesInOrder previous operation hashes (may be empty)
+     * @return the list of actual migrations to perform. The filter can return 
any value here or an empty list.
+     * @throws MigrationException errors
+     */
+    protected List<Migration> filter(MigrationSet set, List<byte[]> 
operationHashesInOrder) throws MigrationException
+    {
+        if ( operationHashesInOrder.size() > set.migrations().size() )
+        {
+            throw new MigrationException(set.id(), String.format("More 
metadata than migrations. Migration ID: %s", set.id()));
+        }
+
+        int compareSize = Math.min(set.migrations().size(), 
operationHashesInOrder.size());
+        List<Migration> subList = set.migrations().subList(0, compareSize);
+        for ( int i = 0; i < compareSize; ++i )
+        {
+            byte[] setHash = hash(set.migrations().get(i).operations());
+            if ( !Arrays.equals(setHash, operationHashesInOrder.get(i)) )
+            {
+                throw new MigrationException(set.id(), String.format("Metadata 
mismatch. Migration ID: %s", set.id()));
+            }
+        }
+        return set.migrations().subList(operationHashesInOrder.size(), 
set.migrations().size());
+    }
+
+    private byte[] hash(List<CuratorOp> operations)
+    {
+        MessageDigest digest;
+        try
+        {
+            digest = MessageDigest.getInstance("SHA-256");
+        }
+        catch ( NoSuchAlgorithmException e )
+        {
+            throw new RuntimeException(e);
+        }
+        operations.forEach(op -> {
+            if ( op instanceof ExtractingCuratorOp )
+            {
+                ((ExtractingCuratorOp)op).addToDigest(digest);
+            }
+            else
+            {
+                digest.update(op.toString().getBytes());
+            }
+        });
+        return digest.digest();
+    }
+
+    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, 
MigrationSet set)
+    {
+        return childrenWithData(client, set.metaDataPath())
+            .thenCompose(metaData -> applyMetaData(set, metaData))
+            .handle((v, e) -> {
+                release(lock, true);
+                if ( e != null )
+                {
+                    Throwables.propagate(e);
+                }
+                return v;
+            }
+        );
+    }
+
+    private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, 
byte[]> metaData)
+    {
+        List<byte[]> sortedMetaData = metaData.keySet()
+            .stream()
+            .sorted(Comparator.naturalOrder())
+            .map(metaData::get)
+            .collect(Collectors.toList());
+
+        List<Migration> toBeApplied;
+        try
+        {
+            toBeApplied = filter(set, sortedMetaData);
+        }
+        catch ( MigrationException e )
+        {
+            CompletableFuture<Void> future = new CompletableFuture<>();
+            future.completeExceptionally(e);
+            return future;
+        }
+
+        if ( toBeApplied.size() == 0 )
+        {
+            return CompletableFuture.completedFuture(null);
+        }
+
+        return asyncEnsureContainers(client, set.metaDataPath())
+            .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied));
+    }
+
+    private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, 
List<Migration> toBeApplied)
+    {
+        String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), 
META_DATA_NODE_NAME);
+        List<CompletableFuture<Object>> stages = 
toBeApplied.stream().map(migration -> {
+            List<CuratorOp> operations = new ArrayList<>();
+            operations.addAll(migration.operations());
+            
operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath,
 hash(operations)));
+            return client.transaction().forOperations(operations).thenApply(__ 
-> null).toCompletableFuture();
+        }).collect(Collectors.toList());
+        return CompletableFuture.allOf(stages.toArray(new 
CompletableFuture[stages.size()]));
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
new file mode 100644
index 0000000..089d3d8
--- /dev/null
+++ 
b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.google.common.collect.ImmutableList;
+import java.util.List;
+import java.util.Objects;
+
+/**
+ * Models a set of migrations. Each individual migration is applied
+ * in a transaction.
+ */
+public interface MigrationSet
+{
+    /**
+     * @return the unique ID for this migration set
+     */
+    String id();
+
+    /**
+     * @return where to store the meta data for this migration set
+     */
+    String metaDataPath();
+
+    /**
+     * @return list of migrations in the order that they should be applied
+     */
+    List<Migration> migrations();
+
+    static MigrationSet build(String id, String metaDataPath, List<Migration> 
migrations)
+    {
+        Objects.requireNonNull(id, "id cannot be null");
+        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
+        final List<Migration> migrationsCopy = 
ImmutableList.copyOf(migrations);
+        return new MigrationSet()
+        {
+            @Override
+            public String id()
+            {
+                return id;
+            }
+
+            @Override
+            public String metaDataPath()
+            {
+                return metaDataPath;
+            }
+
+            @Override
+            public List<Migration> migrations()
+            {
+                return migrationsCopy;
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
deleted file mode 100644
index 8377967..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java
+++ /dev/null
@@ -1,25 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-@FunctionalInterface
-public interface MetaData
-{
-    byte[] operationHash();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
deleted file mode 100644
index 972c59e..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import java.util.List;
-
-/**
- * Models a single migration/transition
- */
-@FunctionalInterface
-public interface Migration
-{
-    /**
-     * Return the operations to execute in a transaction. IMPORTANT: during a 
migration
-     * this method may be called multiple times.
-     *
-     * @return operations
-     */
-    List<CuratorOp> operations();
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
deleted file mode 100644
index 1a1d59c..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import java.util.Objects;
-
-public class MigrationException extends RuntimeException
-{
-    private final String migrationId;
-
-    public MigrationException(String migrationId, String message)
-    {
-        super(message);
-        this.migrationId = Objects.requireNonNull(migrationId, "migrationId 
cannot be null");
-    }
-
-    public String getMigrationId()
-    {
-        return migrationId;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
deleted file mode 100644
index d6d37de..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java
+++ /dev/null
@@ -1,220 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.base.Throwables;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.framework.imps.ExtractingCuratorOp;
-import org.apache.curator.framework.recipes.locks.InterProcessLock;
-import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.modeled.ModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZNode;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.zookeeper.CreateMode;
-import java.security.MessageDigest;
-import java.security.NoSuchAlgorithmException;
-import java.time.Duration;
-import java.util.ArrayList;
-import java.util.Arrays;
-import java.util.Comparator;
-import java.util.List;
-import java.util.Objects;
-import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CompletionStage;
-import java.util.concurrent.Executor;
-import java.util.concurrent.TimeUnit;
-import java.util.stream.Collectors;
-
-import static org.apache.curator.x.async.AsyncWrappers.*;
-
-/**
- * Manages migrations
- */
-public class MigrationManager
-{
-    private final AsyncCuratorFramework client;
-    private final ZPath lockPath;
-    private final Executor executor;
-    private final Duration lockMax;
-
-    private static final String META_DATA_NODE_NAME = "meta-";
-
-    /**
-     * @param client the curator client
-     * @param lockPath base path for locks used by the manager
-     * @param executor the executor to use
-     * @param lockMax max time to wait for locks
-     */
-    public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, 
Executor executor, Duration lockMax)
-    {
-        this.client = Objects.requireNonNull(client, "client cannot be null");
-        this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be 
null");
-        this.executor = Objects.requireNonNull(executor, "executor cannot be 
null");
-        this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be 
null");
-    }
-
-    /**
-     * Process the given migration set
-     *
-     * @param set the set
-     * @return completion stage. If there is a migration-specific error, the 
stage will be completed
-     * exceptionally with {@link 
org.apache.curator.x.async.modeled.migrations.MigrationException}.
-     */
-    public CompletionStage<Void> migrate(MigrationSet set)
-    {
-        String lockPath = this.lockPath.child(set.id()).fullPath();
-        InterProcessLock lock = new 
InterProcessSemaphoreMutex(client.unwrap(), lockPath);
-        CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), 
TimeUnit.MILLISECONDS, executor);
-        return lockStage.thenCompose(__ -> runMigrationInLock(lock, set));
-    }
-
-    /**
-     * Can be overridden to change how the comparison to previous migrations 
is done. The default
-     * version ensures that the meta data from previous migrations matches the 
current migration
-     * set exactly (by order and version). If there is a mismatch, 
<code>MigrationException</code> is thrown.
-     *
-     * @param set the migration set being applied
-     * @param sortedMetaData previous migration meta data (may be empty)
-     * @return the list of actual migrations to perform. The filter can return 
any value here or an empty list.
-     * @throws MigrationException errors
-     */
-    protected List<Migration> filter(MigrationSet set, List<MetaData> 
sortedMetaData) throws MigrationException
-    {
-        if ( sortedMetaData.size() > set.migrations().size() )
-        {
-            throw new MigrationException(set.id(), String.format("More 
metadata than migrations. Migration ID: %s", set.id()));
-        }
-
-        int compareSize = Math.min(set.migrations().size(), 
sortedMetaData.size());
-        List<Migration> subList = set.migrations().subList(0, compareSize);
-        for ( int i = 0; i < compareSize; ++i )
-        {
-            byte[] setHash = 
hash(set.migrations().get(i).operations()).operationHash();
-            if ( !Arrays.equals(setHash, 
sortedMetaData.get(i).operationHash()) )
-            {
-                throw new MigrationException(set.id(), String.format("Metadata 
mismatch. Migration ID: %s", set.id()));
-            }
-        }
-        return set.migrations().subList(sortedMetaData.size(), 
set.migrations().size());
-    }
-
-    private MetaData hash(List<CuratorOp> operations)
-    {
-        MessageDigest digest;
-        try
-        {
-            digest = MessageDigest.getInstance("SHA-256");
-        }
-        catch ( NoSuchAlgorithmException e )
-        {
-            throw new RuntimeException(e);
-        }
-        operations.forEach(op -> {
-            if ( op instanceof ExtractingCuratorOp )
-            {
-                ((ExtractingCuratorOp)op).addToDigest(digest);
-            }
-            else
-            {
-                digest.update(op.toString().getBytes());
-            }
-        });
-        return digest::digest;
-    }
-
-    private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, 
MigrationSet set)
-    {
-        ModeledFramework<MetaData> modeled = 
getMetaDataClient(set.metaDataPath());
-        return modeled.childrenAsZNodes()
-            .thenCompose(metaData -> applyMetaData(set, modeled, metaData))
-            .handle((v, e) -> {
-                release(lock, true);
-                if ( e != null )
-                {
-                    Throwables.propagate(e);
-                }
-                return v;
-            }
-        );
-    }
-
-    private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath)
-    {
-        ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>()
-        {
-            @Override
-            public byte[] serialize(MetaData model)
-            {
-                return model.operationHash();
-            }
-
-            @Override
-            public MetaData deserialize(byte[] bytes)
-            {
-                return () -> bytes;
-            }
-        };
-        ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, 
serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build();
-        return ModeledFramework.wrap(client, modelSpec);
-    }
-
-    private CompletionStage<Void> applyMetaData(MigrationSet set, 
ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes)
-    {
-        List<MetaData> sortedMetaData = metaDataNodes
-            .stream()
-            .sorted(Comparator.comparing(m -> m.path().fullPath()))
-            .map(ZNode::model)
-            .collect(Collectors.toList());
-
-        List<Migration> toBeApplied;
-        try
-        {
-            toBeApplied = filter(set, sortedMetaData);
-        }
-        catch ( MigrationException e )
-        {
-            CompletableFuture<Void> future = new CompletableFuture<>();
-            future.completeExceptionally(e);
-            return future;
-        }
-
-        if ( toBeApplied.size() == 0 )
-        {
-            return CompletableFuture.completedFuture(null);
-        }
-
-        return asyncEnsureContainers(client, metaDataClient.modelSpec().path())
-            .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, 
metaDataClient));
-    }
-
-    private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> 
toBeApplied, ModeledFramework<MetaData> metaDataClient)
-    {
-        List<CompletableFuture<Object>> stages = 
toBeApplied.stream().map(migration -> {
-            List<CuratorOp> operations = new ArrayList<>();
-            operations.addAll(migration.operations());
-            MetaData thisMetaData = hash(operations);
-            
operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData));
-            return client.transaction().forOperations(operations).thenApply(__ 
-> null).toCompletableFuture();
-        }).collect(Collectors.toList());
-        return CompletableFuture.allOf(stages.toArray(new 
CompletableFuture[stages.size()]));
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
 
b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
deleted file mode 100644
index c4cd90e..0000000
--- 
a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java
+++ /dev/null
@@ -1,73 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.google.common.collect.ImmutableList;
-import org.apache.curator.x.async.modeled.ZPath;
-import java.util.List;
-import java.util.Objects;
-
-/**
- * Models a set of migrations. Each individual migration is applied
- * in a transaction.
- */
-public interface MigrationSet
-{
-    /**
-     * @return the unique ID for this migration set
-     */
-    String id();
-
-    /**
-     * @return where to store the meta data for this migration set
-     */
-    ZPath metaDataPath();
-
-    /**
-     * @return list of migrations in the order that they should be applied
-     */
-    List<Migration> migrations();
-
-    static MigrationSet build(String id, ZPath metaDataPath, List<Migration> 
migrations)
-    {
-        Objects.requireNonNull(id, "id cannot be null");
-        Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null");
-        final List<Migration> migrationsCopy = 
ImmutableList.copyOf(migrations);
-        return new MigrationSet()
-        {
-            @Override
-            public String id()
-            {
-                return id;
-            }
-
-            @Override
-            public ZPath metaDataPath()
-            {
-                return metaDataPath;
-            }
-
-            @Override
-            public List<Migration> migrations()
-            {
-                return migrationsCopy;
-            }
-        };
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
new file mode 100644
index 0000000..42bc76d
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.transaction.CuratorOp;
+import org.apache.curator.retry.RetryOneTime;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.curator.x.async.AsyncCuratorFramework;
+import org.apache.curator.x.async.CompletableBaseClassForTests;
+import org.apache.curator.x.async.modeled.JacksonModelSerializer;
+import org.apache.curator.x.async.modeled.ModelSpec;
+import org.apache.curator.x.async.modeled.ModeledFramework;
+import org.apache.curator.x.async.modeled.ZPath;
+import org.apache.curator.x.async.migrations.models.ModelV1;
+import org.apache.curator.x.async.migrations.models.ModelV2;
+import org.apache.curator.x.async.migrations.models.ModelV3;
+import org.testng.Assert;
+import org.testng.annotations.AfterMethod;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+import java.io.IOException;
+import java.time.Duration;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.function.UnaryOperator;
+
+public class TestMigrationManager extends CompletableBaseClassForTests
+{
+    private AsyncCuratorFramework client;
+    private ModelSpec<ModelV1> v1Spec;
+    private ModelSpec<ModelV2> v2Spec;
+    private ModelSpec<ModelV3> v3Spec;
+    private ExecutorService executor;
+    private CuratorOp v1opA;
+    private CuratorOp v1opB;
+    private CuratorOp v2op;
+    private CuratorOp v3op;
+    private MigrationManager manager;
+
+    @BeforeMethod
+    @Override
+    public void setup() throws Exception
+    {
+        super.setup();
+
+        CuratorFramework rawClient = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(100));
+        rawClient.start();
+
+        this.client = AsyncCuratorFramework.wrap(rawClient);
+
+        ObjectMapper mapper = new ObjectMapper();
+        UnaryOperator<byte[]> from1to2 = bytes -> {
+            try
+            {
+                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
+                ModelV2 v2 = new ModelV2(v1.getName(), 64);
+                return mapper.writeValueAsBytes(v2);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        UnaryOperator<byte[]> from2to3 = bytes -> {
+            try
+            {
+                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
+                String[] nameParts = v2.getName().split("\\s");
+                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], 
v2.getAge());
+                return mapper.writeValueAsBytes(v3);
+            }
+            catch ( IOException e )
+            {
+                throw new RuntimeException(e);
+            }
+        };
+
+        ZPath modelPath = ZPath.parse("/test/it");
+
+        v1Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV1.class)).build();
+        v2Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV2.class)).build();
+        v3Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV3.class)).build();
+
+        v1opA = 
client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
+        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new 
ModelV1("Test"));
+        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new 
ModelV2("Test 2", 10));
+        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new 
ModelV3("One", "Two", 30));
+
+        executor = Executors.newCachedThreadPool();
+        manager = new MigrationManager(client, ZPath.parse("/locks"), 
executor, Duration.ofMinutes(10));
+    }
+
+    @AfterMethod
+    @Override
+    public void teardown() throws Exception
+    {
+        CloseableUtils.closeQuietly(client.unwrap());
+        executor.shutdownNow();
+        super.teardown();
+    }
+
+    @Test
+    public void testBasic() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        Migration m2 = () -> Collections.singletonList(v2op);
+        Migration m3 = () -> Collections.singletonList(v3op);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata", 
Arrays.asList(m1, m2, m3));
+
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, 
v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+
+    @Test
+    public void testStaged() throws Exception
+    {
+        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
+        MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", 
Collections.singletonList(m1));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, 
v1Spec);
+        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), 
"Test"));
+
+        Migration m2 = () -> Collections.singletonList(v2op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", 
Arrays.asList(m1, m2));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, 
v2Spec);
+        complete(v2Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getName(), "Test 2");
+            Assert.assertEquals(m.getAge(), 10);
+        });
+
+        Migration m3 = () -> Collections.singletonList(v3op);
+        migrationSet = MigrationSet.build("1", "/metadata/nodes", 
Arrays.asList(m1, m2, m3));
+        complete(manager.migrate(migrationSet));
+
+        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, 
v3Spec);
+        complete(v3Client.read(), (m, e) -> {
+            Assert.assertEquals(m.getAge(), 30);
+            Assert.assertEquals(m.getFirstName(), "One");
+            Assert.assertEquals(m.getLastName(), "Two");
+        });
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
new file mode 100644
index 0000000..46fc5ff
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV1
+{
+    private final String name;
+
+    public ModelV1()
+    {
+        this("");
+    }
+
+    public ModelV1(String name)
+    {
+        this.name = name;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
new file mode 100644
index 0000000..31e05bd
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java
@@ -0,0 +1,46 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV2
+{
+    private final String name;
+    private final int age;
+
+    public ModelV2()
+    {
+        this("", 0);
+    }
+
+    public ModelV2(String name, int age)
+    {
+        this.name = name;
+        this.age = age;
+    }
+
+    public String getName()
+    {
+        return name;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
new file mode 100644
index 0000000..519923c
--- /dev/null
+++ 
b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java
@@ -0,0 +1,53 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.curator.x.async.migrations.models;
+
+public class ModelV3
+{
+    private final String firstName;
+    private final String lastName;
+    private final int age;
+
+    public ModelV3()
+    {
+        this("", "", 0);
+    }
+
+    public ModelV3(String firstName, String lastName, int age)
+    {
+        this.firstName = firstName;
+        this.lastName = lastName;
+        this.age = age;
+    }
+
+    public String getFirstName()
+    {
+        return firstName;
+    }
+
+    public String getLastName()
+    {
+        return lastName;
+    }
+
+    public int getAge()
+    {
+        return age;
+    }
+}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
deleted file mode 100644
index 45aa130..0000000
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java
+++ /dev/null
@@ -1,173 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations;
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import org.apache.curator.framework.CuratorFramework;
-import org.apache.curator.framework.CuratorFrameworkFactory;
-import org.apache.curator.framework.api.transaction.CuratorOp;
-import org.apache.curator.retry.RetryOneTime;
-import org.apache.curator.utils.CloseableUtils;
-import org.apache.curator.x.async.AsyncCuratorFramework;
-import org.apache.curator.x.async.CompletableBaseClassForTests;
-import org.apache.curator.x.async.modeled.JacksonModelSerializer;
-import org.apache.curator.x.async.modeled.ModelSpec;
-import org.apache.curator.x.async.modeled.ModeledFramework;
-import org.apache.curator.x.async.modeled.ZPath;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV1;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV2;
-import org.apache.curator.x.async.modeled.migrations.models.ModelV3;
-import org.testng.Assert;
-import org.testng.annotations.AfterMethod;
-import org.testng.annotations.BeforeMethod;
-import org.testng.annotations.Test;
-import java.io.IOException;
-import java.time.Duration;
-import java.util.Arrays;
-import java.util.Collections;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.function.UnaryOperator;
-
-public class TestMigrationManager extends CompletableBaseClassForTests
-{
-    private AsyncCuratorFramework client;
-    private ModelSpec<ModelV1> v1Spec;
-    private ModelSpec<ModelV2> v2Spec;
-    private ModelSpec<ModelV3> v3Spec;
-    private ExecutorService executor;
-    private CuratorOp v1opA;
-    private CuratorOp v1opB;
-    private CuratorOp v2op;
-    private CuratorOp v3op;
-    private MigrationManager manager;
-
-    @BeforeMethod
-    @Override
-    public void setup() throws Exception
-    {
-        super.setup();
-
-        CuratorFramework rawClient = 
CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), 
timing.connection(), new RetryOneTime(100));
-        rawClient.start();
-
-        this.client = AsyncCuratorFramework.wrap(rawClient);
-
-        ObjectMapper mapper = new ObjectMapper();
-        UnaryOperator<byte[]> from1to2 = bytes -> {
-            try
-            {
-                ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes);
-                ModelV2 v2 = new ModelV2(v1.getName(), 64);
-                return mapper.writeValueAsBytes(v2);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        UnaryOperator<byte[]> from2to3 = bytes -> {
-            try
-            {
-                ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes);
-                String[] nameParts = v2.getName().split("\\s");
-                ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], 
v2.getAge());
-                return mapper.writeValueAsBytes(v3);
-            }
-            catch ( IOException e )
-            {
-                throw new RuntimeException(e);
-            }
-        };
-
-        ZPath modelPath = ZPath.parse("/test/it");
-
-        v1Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV1.class)).build();
-        v2Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV2.class)).build();
-        v3Spec = ModelSpec.builder(modelPath, 
JacksonModelSerializer.build(ModelV3.class)).build();
-
-        v1opA = 
client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath());
-        v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new 
ModelV1("Test"));
-        v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new 
ModelV2("Test 2", 10));
-        v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new 
ModelV3("One", "Two", 30));
-
-        executor = Executors.newCachedThreadPool();
-        manager = new MigrationManager(client, ZPath.parse("/locks"), 
executor, Duration.ofMinutes(10));
-    }
-
-    @AfterMethod
-    @Override
-    public void teardown() throws Exception
-    {
-        CloseableUtils.closeQuietly(client.unwrap());
-        executor.shutdownNow();
-        super.teardown();
-    }
-
-    @Test
-    public void testBasic() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        Migration m2 = () -> Collections.singletonList(v2op);
-        Migration m3 = () -> Collections.singletonList(v3op);
-        MigrationSet migrationSet = MigrationSet.build("1", 
ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3));
-
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, 
v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-
-    @Test
-    public void testStaged() throws Exception
-    {
-        Migration m1 = () -> Arrays.asList(v1opA, v1opB);
-        MigrationSet migrationSet = MigrationSet.build("1", 
ZPath.parse("/metadata"), Collections.singletonList(m1));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, 
v1Spec);
-        complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), 
"Test"));
-
-        Migration m2 = () -> Collections.singletonList(v2op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), 
Arrays.asList(m1, m2));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, 
v2Spec);
-        complete(v2Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getName(), "Test 2");
-            Assert.assertEquals(m.getAge(), 10);
-        });
-
-        Migration m3 = () -> Collections.singletonList(v3op);
-        migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), 
Arrays.asList(m1, m2, m3));
-        complete(manager.migrate(migrationSet));
-
-        ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, 
v3Spec);
-        complete(v3Client.read(), (m, e) -> {
-            Assert.assertEquals(m.getAge(), 30);
-            Assert.assertEquals(m.getFirstName(), "One");
-            Assert.assertEquals(m.getLastName(), "Two");
-        });
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
deleted file mode 100644
index 02b13b7..0000000
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java
+++ /dev/null
@@ -1,39 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV1
-{
-    private final String name;
-
-    public ModelV1()
-    {
-        this("");
-    }
-
-    public ModelV1(String name)
-    {
-        this.name = name;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
deleted file mode 100644
index bd77a2e..0000000
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV2
-{
-    private final String name;
-    private final int age;
-
-    public ModelV2()
-    {
-        this("", 0);
-    }
-
-    public ModelV2(String name, int age)
-    {
-        this.name = name;
-        this.age = age;
-    }
-
-    public String getName()
-    {
-        return name;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}

http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
----------------------------------------------------------------------
diff --git 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
 
b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
deleted file mode 100644
index d4713b8..0000000
--- 
a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java
+++ /dev/null
@@ -1,53 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *   http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.curator.x.async.modeled.migrations.models;
-
-public class ModelV3
-{
-    private final String firstName;
-    private final String lastName;
-    private final int age;
-
-    public ModelV3()
-    {
-        this("", "", 0);
-    }
-
-    public ModelV3(String firstName, String lastName, int age)
-    {
-        this.firstName = firstName;
-        this.lastName = lastName;
-        this.age = age;
-    }
-
-    public String getFirstName()
-    {
-        return firstName;
-    }
-
-    public String getLastName()
-    {
-        return lastName;
-    }
-
-    public int getAge()
-    {
-        return age;
-    }
-}

Reply via email to