major refactoring and simplification. No longer dependent on any modeled code so it's moved to the parent async package
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/d80651a7 Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/d80651a7 Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/d80651a7 Branch: refs/heads/master Commit: d80651a7a276b0ce999601c30e205aceaae94d4c Parents: 2ab172a Author: randgalt <[email protected]> Authored: Fri Jul 14 12:32:37 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 12:32:37 2017 -0500 ---------------------------------------------------------------------- .../apache/curator/x/async/AsyncWrappers.java | 96 +++++++- .../curator/x/async/migrations/Migration.java | 37 ++++ .../x/async/migrations/MigrationException.java | 37 ++++ .../x/async/migrations/MigrationManager.java | 195 ++++++++++++++++ .../x/async/migrations/MigrationSet.java | 72 ++++++ .../x/async/modeled/migrations/MetaData.java | 25 --- .../x/async/modeled/migrations/Migration.java | 37 ---- .../modeled/migrations/MigrationException.java | 37 ---- .../modeled/migrations/MigrationManager.java | 220 ------------------- .../async/modeled/migrations/MigrationSet.java | 73 ------ .../async/migrations/TestMigrationManager.java | 173 +++++++++++++++ .../x/async/migrations/models/ModelV1.java | 39 ++++ .../x/async/migrations/models/ModelV2.java | 46 ++++ .../x/async/migrations/models/ModelV3.java | 53 +++++ .../migrations/TestMigrationManager.java | 173 --------------- .../modeled/migrations/models/ModelV1.java | 39 ---- .../modeled/migrations/models/ModelV2.java | 46 ---- .../modeled/migrations/models/ModelV3.java | 53 ----- 18 files changed, 747 insertions(+), 704 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java index 9630985..d7b3cc3 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/AsyncWrappers.java @@ -18,12 +18,16 @@ */ package org.apache.curator.x.async; +import com.google.common.base.Throwables; +import com.google.common.collect.Maps; import org.apache.curator.framework.recipes.locks.InterProcessLock; import org.apache.curator.utils.ThreadUtils; import org.apache.curator.utils.ZKPaths; import org.apache.curator.x.async.api.ExistsOption; -import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.KeeperException; import java.util.Collections; +import java.util.List; +import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; @@ -70,6 +74,66 @@ import java.util.concurrent.TimeUnit; public class AsyncWrappers { /** + * <p> + * Return the children of the given path (keyed by the full path) and the data for each node. + * IMPORTANT: this results in a ZooKeeper query + * for each child node returned. i.e. if the initial children() call returns + * 10 nodes an additional 10 ZooKeeper queries are made to get the data. + * </p> + * + * <p> + * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException} + * is <strong>NOT</strong> set. Instead the stage is completed with an empty map. + * </p> + * + * @return CompletionStage + */ + public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path) + { + return childrenWithData(client, path, false); + } + + /** + * <p> + * Return the children of the given path (keyed by the full path) and the data for each node. + * IMPORTANT: this results in a ZooKeeper query + * for each child node returned. i.e. if the initial children() call returns + * 10 nodes an additional 10 ZooKeeper queries are made to get the data. + * </p> + * + * <p> + * Note: if the any of the nodes in the path do not exist yet, {@link org.apache.zookeeper.KeeperException.NoNodeException} + * is <strong>NOT</strong> set. Instead the stage is completed with an empty map. + * </p> + * + * @param isCompressed pass true if data is compressed + * @return CompletionStage + */ + public static CompletionStage<Map<String, byte[]>> childrenWithData(AsyncCuratorFramework client, String path, boolean isCompressed) + { + CompletableFuture<Map<String, byte[]>> future = new CompletableFuture<>(); + client.getChildren().forPath(path).handle((children, e) -> { + if ( e != null ) + { + if ( Throwables.getRootCause(e) instanceof KeeperException.NoNodeException ) + { + future.complete(Maps.newHashMap()); + } + else + { + future.completeExceptionally(e); + } + } + else + { + completeChildren(client, future, path, children, isCompressed); + } + return null; + }); + return future; + } + + /** * Asynchronously call {@link org.apache.curator.framework.CuratorFramework#createContainers(String)} using * the given executor * @@ -279,6 +343,36 @@ public class AsyncWrappers } } + private static void completeChildren(AsyncCuratorFramework client, CompletableFuture<Map<String, byte[]>> future, String parentPath, List<String> children, boolean isCompressed) + { + Map<String, byte[]> nodes = Maps.newHashMap(); + if ( children.size() == 0 ) + { + future.complete(nodes); + return; + } + + children.forEach(node -> { + String path = ZKPaths.makePath(parentPath, node); + AsyncStage<byte[]> stage = isCompressed ? client.getData().decompressed().forPath(path) : client.getData().forPath(path); + stage.handle((data, e) -> { + if ( e != null ) + { + future.completeExceptionally(e); + } + else + { + nodes.put(path, data); + if ( nodes.size() == children.size() ) + { + future.complete(nodes); + } + } + return null; + }); + }); + } + private AsyncWrappers() { } http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java new file mode 100644 index 0000000..daac435 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/Migration.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations; + +import org.apache.curator.framework.api.transaction.CuratorOp; +import java.util.List; + +/** + * Models a single migration/transition + */ +@FunctionalInterface +public interface Migration +{ + /** + * Return the operations to execute in a transaction. IMPORTANT: during a migration + * this method may be called multiple times. + * + * @return operations + */ + List<CuratorOp> operations(); +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java new file mode 100644 index 0000000..c4971ce --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations; + +import java.util.Objects; + +public class MigrationException extends RuntimeException +{ + private final String migrationId; + + public MigrationException(String migrationId, String message) + { + super(message); + this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null"); + } + + public String getMigrationId() + { + return migrationId; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java new file mode 100644 index 0000000..cb3d6ff --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationManager.java @@ -0,0 +1,195 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations; + +import com.google.common.base.Throwables; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.imps.ExtractingCuratorOp; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; +import org.apache.curator.utils.ZKPaths; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.zookeeper.CreateMode; +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Comparator; +import java.util.List; +import java.util.Map; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; + +import static org.apache.curator.x.async.AsyncWrappers.*; + +/** + * Manages migrations + */ +public class MigrationManager +{ + private final AsyncCuratorFramework client; + private final String lockPath; + private final Executor executor; + private final Duration lockMax; + + private static final String META_DATA_NODE_NAME = "meta-"; + + /** + * @param client the curator client + * @param lockPath base path for locks used by the manager + * @param executor the executor to use + * @param lockMax max time to wait for locks + */ + public MigrationManager(AsyncCuratorFramework client, String lockPath, Executor executor, Duration lockMax) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); + this.executor = Objects.requireNonNull(executor, "executor cannot be null"); + this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); + } + + /** + * Process the given migration set + * + * @param set the set + * @return completion stage. If there is a migration-specific error, the stage will be completed + * exceptionally with {@link org.apache.curator.x.async.migrations.MigrationException}. + */ + public CompletionStage<Void> migrate(MigrationSet set) + { + InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), ZKPaths.makePath(lockPath, set.id())); + CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor); + return lockStage.thenCompose(__ -> runMigrationInLock(lock, set)); + } + + /** + * Can be overridden to change how the comparison to previous migrations is done. The default + * version ensures that the meta data from previous migrations matches the current migration + * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown. + * + * @param set the migration set being applied + * @param operationHashesInOrder previous operation hashes (may be empty) + * @return the list of actual migrations to perform. The filter can return any value here or an empty list. + * @throws MigrationException errors + */ + protected List<Migration> filter(MigrationSet set, List<byte[]> operationHashesInOrder) throws MigrationException + { + if ( operationHashesInOrder.size() > set.migrations().size() ) + { + throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id())); + } + + int compareSize = Math.min(set.migrations().size(), operationHashesInOrder.size()); + List<Migration> subList = set.migrations().subList(0, compareSize); + for ( int i = 0; i < compareSize; ++i ) + { + byte[] setHash = hash(set.migrations().get(i).operations()); + if ( !Arrays.equals(setHash, operationHashesInOrder.get(i)) ) + { + throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id())); + } + } + return set.migrations().subList(operationHashesInOrder.size(), set.migrations().size()); + } + + private byte[] hash(List<CuratorOp> operations) + { + MessageDigest digest; + try + { + digest = MessageDigest.getInstance("SHA-256"); + } + catch ( NoSuchAlgorithmException e ) + { + throw new RuntimeException(e); + } + operations.forEach(op -> { + if ( op instanceof ExtractingCuratorOp ) + { + ((ExtractingCuratorOp)op).addToDigest(digest); + } + else + { + digest.update(op.toString().getBytes()); + } + }); + return digest.digest(); + } + + private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) + { + return childrenWithData(client, set.metaDataPath()) + .thenCompose(metaData -> applyMetaData(set, metaData)) + .handle((v, e) -> { + release(lock, true); + if ( e != null ) + { + Throwables.propagate(e); + } + return v; + } + ); + } + + private CompletionStage<Void> applyMetaData(MigrationSet set, Map<String, byte[]> metaData) + { + List<byte[]> sortedMetaData = metaData.keySet() + .stream() + .sorted(Comparator.naturalOrder()) + .map(metaData::get) + .collect(Collectors.toList()); + + List<Migration> toBeApplied; + try + { + toBeApplied = filter(set, sortedMetaData); + } + catch ( MigrationException e ) + { + CompletableFuture<Void> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + if ( toBeApplied.size() == 0 ) + { + return CompletableFuture.completedFuture(null); + } + + return asyncEnsureContainers(client, set.metaDataPath()) + .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied)); + } + + private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied) + { + String metaDataBasePath = ZKPaths.makePath(set.metaDataPath(), META_DATA_NODE_NAME); + List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> { + List<CuratorOp> operations = new ArrayList<>(); + operations.addAll(migration.operations()); + operations.add(client.transactionOp().create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).forPath(metaDataBasePath, hash(operations))); + return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture(); + }).collect(Collectors.toList()); + return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()])); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java new file mode 100644 index 0000000..089d3d8 --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/migrations/MigrationSet.java @@ -0,0 +1,72 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations; + +import com.google.common.collect.ImmutableList; +import java.util.List; +import java.util.Objects; + +/** + * Models a set of migrations. Each individual migration is applied + * in a transaction. + */ +public interface MigrationSet +{ + /** + * @return the unique ID for this migration set + */ + String id(); + + /** + * @return where to store the meta data for this migration set + */ + String metaDataPath(); + + /** + * @return list of migrations in the order that they should be applied + */ + List<Migration> migrations(); + + static MigrationSet build(String id, String metaDataPath, List<Migration> migrations) + { + Objects.requireNonNull(id, "id cannot be null"); + Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null"); + final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations); + return new MigrationSet() + { + @Override + public String id() + { + return id; + } + + @Override + public String metaDataPath() + { + return metaDataPath; + } + + @Override + public List<Migration> migrations() + { + return migrationsCopy; + } + }; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java deleted file mode 100644 index 8377967..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MetaData.java +++ /dev/null @@ -1,25 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -@FunctionalInterface -public interface MetaData -{ - byte[] operationHash(); -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java deleted file mode 100644 index 972c59e..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import org.apache.curator.framework.api.transaction.CuratorOp; -import java.util.List; - -/** - * Models a single migration/transition - */ -@FunctionalInterface -public interface Migration -{ - /** - * Return the operations to execute in a transaction. IMPORTANT: during a migration - * this method may be called multiple times. - * - * @return operations - */ - List<CuratorOp> operations(); -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java deleted file mode 100644 index 1a1d59c..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import java.util.Objects; - -public class MigrationException extends RuntimeException -{ - private final String migrationId; - - public MigrationException(String migrationId, String message) - { - super(message); - this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null"); - } - - public String getMigrationId() - { - return migrationId; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java deleted file mode 100644 index d6d37de..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java +++ /dev/null @@ -1,220 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import com.google.common.base.Throwables; -import org.apache.curator.framework.api.transaction.CuratorOp; -import org.apache.curator.framework.imps.ExtractingCuratorOp; -import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.modeled.ModelSerializer; -import org.apache.curator.x.async.modeled.ModelSpec; -import org.apache.curator.x.async.modeled.ModeledFramework; -import org.apache.curator.x.async.modeled.ZNode; -import org.apache.curator.x.async.modeled.ZPath; -import org.apache.zookeeper.CreateMode; -import java.security.MessageDigest; -import java.security.NoSuchAlgorithmException; -import java.time.Duration; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.Comparator; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.curator.x.async.AsyncWrappers.*; - -/** - * Manages migrations - */ -public class MigrationManager -{ - private final AsyncCuratorFramework client; - private final ZPath lockPath; - private final Executor executor; - private final Duration lockMax; - - private static final String META_DATA_NODE_NAME = "meta-"; - - /** - * @param client the curator client - * @param lockPath base path for locks used by the manager - * @param executor the executor to use - * @param lockMax max time to wait for locks - */ - public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, Executor executor, Duration lockMax) - { - this.client = Objects.requireNonNull(client, "client cannot be null"); - this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); - this.executor = Objects.requireNonNull(executor, "executor cannot be null"); - this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); - } - - /** - * Process the given migration set - * - * @param set the set - * @return completion stage. If there is a migration-specific error, the stage will be completed - * exceptionally with {@link org.apache.curator.x.async.modeled.migrations.MigrationException}. - */ - public CompletionStage<Void> migrate(MigrationSet set) - { - String lockPath = this.lockPath.child(set.id()).fullPath(); - InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath); - CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor); - return lockStage.thenCompose(__ -> runMigrationInLock(lock, set)); - } - - /** - * Can be overridden to change how the comparison to previous migrations is done. The default - * version ensures that the meta data from previous migrations matches the current migration - * set exactly (by order and version). If there is a mismatch, <code>MigrationException</code> is thrown. - * - * @param set the migration set being applied - * @param sortedMetaData previous migration meta data (may be empty) - * @return the list of actual migrations to perform. The filter can return any value here or an empty list. - * @throws MigrationException errors - */ - protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException - { - if ( sortedMetaData.size() > set.migrations().size() ) - { - throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s", set.id())); - } - - int compareSize = Math.min(set.migrations().size(), sortedMetaData.size()); - List<Migration> subList = set.migrations().subList(0, compareSize); - for ( int i = 0; i < compareSize; ++i ) - { - byte[] setHash = hash(set.migrations().get(i).operations()).operationHash(); - if ( !Arrays.equals(setHash, sortedMetaData.get(i).operationHash()) ) - { - throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s", set.id())); - } - } - return set.migrations().subList(sortedMetaData.size(), set.migrations().size()); - } - - private MetaData hash(List<CuratorOp> operations) - { - MessageDigest digest; - try - { - digest = MessageDigest.getInstance("SHA-256"); - } - catch ( NoSuchAlgorithmException e ) - { - throw new RuntimeException(e); - } - operations.forEach(op -> { - if ( op instanceof ExtractingCuratorOp ) - { - ((ExtractingCuratorOp)op).addToDigest(digest); - } - else - { - digest.update(op.toString().getBytes()); - } - }); - return digest::digest; - } - - private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) - { - ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath()); - return modeled.childrenAsZNodes() - .thenCompose(metaData -> applyMetaData(set, modeled, metaData)) - .handle((v, e) -> { - release(lock, true); - if ( e != null ) - { - Throwables.propagate(e); - } - return v; - } - ); - } - - private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath) - { - ModelSerializer<MetaData> serializer = new ModelSerializer<MetaData>() - { - @Override - public byte[] serialize(MetaData model) - { - return model.operationHash(); - } - - @Override - public MetaData deserialize(byte[] bytes) - { - return () -> bytes; - } - }; - ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, serializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build(); - return ModeledFramework.wrap(client, modelSpec); - } - - private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes) - { - List<MetaData> sortedMetaData = metaDataNodes - .stream() - .sorted(Comparator.comparing(m -> m.path().fullPath())) - .map(ZNode::model) - .collect(Collectors.toList()); - - List<Migration> toBeApplied; - try - { - toBeApplied = filter(set, sortedMetaData); - } - catch ( MigrationException e ) - { - CompletableFuture<Void> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } - - if ( toBeApplied.size() == 0 ) - { - return CompletableFuture.completedFuture(null); - } - - return asyncEnsureContainers(client, metaDataClient.modelSpec().path()) - .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient)); - } - - private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient) - { - List<CompletableFuture<Object>> stages = toBeApplied.stream().map(migration -> { - List<CuratorOp> operations = new ArrayList<>(); - operations.addAll(migration.operations()); - MetaData thisMetaData = hash(operations); - operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData)); - return client.transaction().forOperations(operations).thenApply(__ -> null).toCompletableFuture(); - }).collect(Collectors.toList()); - return CompletableFuture.allOf(stages.toArray(new CompletableFuture[stages.size()])); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java deleted file mode 100644 index c4cd90e..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java +++ /dev/null @@ -1,73 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import com.google.common.collect.ImmutableList; -import org.apache.curator.x.async.modeled.ZPath; -import java.util.List; -import java.util.Objects; - -/** - * Models a set of migrations. Each individual migration is applied - * in a transaction. - */ -public interface MigrationSet -{ - /** - * @return the unique ID for this migration set - */ - String id(); - - /** - * @return where to store the meta data for this migration set - */ - ZPath metaDataPath(); - - /** - * @return list of migrations in the order that they should be applied - */ - List<Migration> migrations(); - - static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations) - { - Objects.requireNonNull(id, "id cannot be null"); - Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null"); - final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations); - return new MigrationSet() - { - @Override - public String id() - { - return id; - } - - @Override - public ZPath metaDataPath() - { - return metaDataPath; - } - - @Override - public List<Migration> migrations() - { - return migrationsCopy; - } - }; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java new file mode 100644 index 0000000..42bc76d --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/TestMigrationManager.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations; + +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.curator.framework.CuratorFramework; +import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.retry.RetryOneTime; +import org.apache.curator.utils.CloseableUtils; +import org.apache.curator.x.async.AsyncCuratorFramework; +import org.apache.curator.x.async.CompletableBaseClassForTests; +import org.apache.curator.x.async.modeled.JacksonModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZPath; +import org.apache.curator.x.async.migrations.models.ModelV1; +import org.apache.curator.x.async.migrations.models.ModelV2; +import org.apache.curator.x.async.migrations.models.ModelV3; +import org.testng.Assert; +import org.testng.annotations.AfterMethod; +import org.testng.annotations.BeforeMethod; +import org.testng.annotations.Test; +import java.io.IOException; +import java.time.Duration; +import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.function.UnaryOperator; + +public class TestMigrationManager extends CompletableBaseClassForTests +{ + private AsyncCuratorFramework client; + private ModelSpec<ModelV1> v1Spec; + private ModelSpec<ModelV2> v2Spec; + private ModelSpec<ModelV3> v3Spec; + private ExecutorService executor; + private CuratorOp v1opA; + private CuratorOp v1opB; + private CuratorOp v2op; + private CuratorOp v3op; + private MigrationManager manager; + + @BeforeMethod + @Override + public void setup() throws Exception + { + super.setup(); + + CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100)); + rawClient.start(); + + this.client = AsyncCuratorFramework.wrap(rawClient); + + ObjectMapper mapper = new ObjectMapper(); + UnaryOperator<byte[]> from1to2 = bytes -> { + try + { + ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes); + ModelV2 v2 = new ModelV2(v1.getName(), 64); + return mapper.writeValueAsBytes(v2); + } + catch ( IOException e ) + { + throw new RuntimeException(e); + } + }; + + UnaryOperator<byte[]> from2to3 = bytes -> { + try + { + ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes); + String[] nameParts = v2.getName().split("\\s"); + ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge()); + return mapper.writeValueAsBytes(v3); + } + catch ( IOException e ) + { + throw new RuntimeException(e); + } + }; + + ZPath modelPath = ZPath.parse("/test/it"); + + v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build(); + v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build(); + v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build(); + + v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath()); + v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test")); + v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10)); + v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); + + executor = Executors.newCachedThreadPool(); + manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10)); + } + + @AfterMethod + @Override + public void teardown() throws Exception + { + CloseableUtils.closeQuietly(client.unwrap()); + executor.shutdownNow(); + super.teardown(); + } + + @Test + public void testBasic() throws Exception + { + Migration m1 = () -> Arrays.asList(v1opA, v1opB); + Migration m2 = () -> Collections.singletonList(v2op); + Migration m3 = () -> Collections.singletonList(v3op); + MigrationSet migrationSet = MigrationSet.build("1", "/metadata", Arrays.asList(m1, m2, m3)); + + complete(manager.migrate(migrationSet)); + + ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); + complete(v3Client.read(), (m, e) -> { + Assert.assertEquals(m.getAge(), 30); + Assert.assertEquals(m.getFirstName(), "One"); + Assert.assertEquals(m.getLastName(), "Two"); + }); + } + + @Test + public void testStaged() throws Exception + { + Migration m1 = () -> Arrays.asList(v1opA, v1opB); + MigrationSet migrationSet = MigrationSet.build("1", "/metadata/nodes", Collections.singletonList(m1)); + complete(manager.migrate(migrationSet)); + + ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec); + complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test")); + + Migration m2 = () -> Collections.singletonList(v2op); + migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2)); + complete(manager.migrate(migrationSet)); + + ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec); + complete(v2Client.read(), (m, e) -> { + Assert.assertEquals(m.getName(), "Test 2"); + Assert.assertEquals(m.getAge(), 10); + }); + + Migration m3 = () -> Collections.singletonList(v3op); + migrationSet = MigrationSet.build("1", "/metadata/nodes", Arrays.asList(m1, m2, m3)); + complete(manager.migrate(migrationSet)); + + ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); + complete(v3Client.read(), (m, e) -> { + Assert.assertEquals(m.getAge(), 30); + Assert.assertEquals(m.getFirstName(), "One"); + Assert.assertEquals(m.getLastName(), "Two"); + }); + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java new file mode 100644 index 0000000..46fc5ff --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV1.java @@ -0,0 +1,39 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations.models; + +public class ModelV1 +{ + private final String name; + + public ModelV1() + { + this(""); + } + + public ModelV1(String name) + { + this.name = name; + } + + public String getName() + { + return name; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java new file mode 100644 index 0000000..31e05bd --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV2.java @@ -0,0 +1,46 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations.models; + +public class ModelV2 +{ + private final String name; + private final int age; + + public ModelV2() + { + this("", 0); + } + + public ModelV2(String name, int age) + { + this.name = name; + this.age = age; + } + + public String getName() + { + return name; + } + + public int getAge() + { + return age; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java new file mode 100644 index 0000000..519923c --- /dev/null +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/migrations/models/ModelV3.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.migrations.models; + +public class ModelV3 +{ + private final String firstName; + private final String lastName; + private final int age; + + public ModelV3() + { + this("", "", 0); + } + + public ModelV3(String firstName, String lastName, int age) + { + this.firstName = firstName; + this.lastName = lastName; + this.age = age; + } + + public String getFirstName() + { + return firstName; + } + + public String getLastName() + { + return lastName; + } + + public int getAge() + { + return age; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java deleted file mode 100644 index 45aa130..0000000 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java +++ /dev/null @@ -1,173 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import com.fasterxml.jackson.databind.ObjectMapper; -import org.apache.curator.framework.CuratorFramework; -import org.apache.curator.framework.CuratorFrameworkFactory; -import org.apache.curator.framework.api.transaction.CuratorOp; -import org.apache.curator.retry.RetryOneTime; -import org.apache.curator.utils.CloseableUtils; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.CompletableBaseClassForTests; -import org.apache.curator.x.async.modeled.JacksonModelSerializer; -import org.apache.curator.x.async.modeled.ModelSpec; -import org.apache.curator.x.async.modeled.ModeledFramework; -import org.apache.curator.x.async.modeled.ZPath; -import org.apache.curator.x.async.modeled.migrations.models.ModelV1; -import org.apache.curator.x.async.modeled.migrations.models.ModelV2; -import org.apache.curator.x.async.modeled.migrations.models.ModelV3; -import org.testng.Assert; -import org.testng.annotations.AfterMethod; -import org.testng.annotations.BeforeMethod; -import org.testng.annotations.Test; -import java.io.IOException; -import java.time.Duration; -import java.util.Arrays; -import java.util.Collections; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; -import java.util.function.UnaryOperator; - -public class TestMigrationManager extends CompletableBaseClassForTests -{ - private AsyncCuratorFramework client; - private ModelSpec<ModelV1> v1Spec; - private ModelSpec<ModelV2> v2Spec; - private ModelSpec<ModelV3> v3Spec; - private ExecutorService executor; - private CuratorOp v1opA; - private CuratorOp v1opB; - private CuratorOp v2op; - private CuratorOp v3op; - private MigrationManager manager; - - @BeforeMethod - @Override - public void setup() throws Exception - { - super.setup(); - - CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100)); - rawClient.start(); - - this.client = AsyncCuratorFramework.wrap(rawClient); - - ObjectMapper mapper = new ObjectMapper(); - UnaryOperator<byte[]> from1to2 = bytes -> { - try - { - ModelV1 v1 = mapper.readerFor(ModelV1.class).readValue(bytes); - ModelV2 v2 = new ModelV2(v1.getName(), 64); - return mapper.writeValueAsBytes(v2); - } - catch ( IOException e ) - { - throw new RuntimeException(e); - } - }; - - UnaryOperator<byte[]> from2to3 = bytes -> { - try - { - ModelV2 v2 = mapper.readerFor(ModelV2.class).readValue(bytes); - String[] nameParts = v2.getName().split("\\s"); - ModelV3 v3 = new ModelV3(nameParts[0], nameParts[1], v2.getAge()); - return mapper.writeValueAsBytes(v3); - } - catch ( IOException e ) - { - throw new RuntimeException(e); - } - }; - - ZPath modelPath = ZPath.parse("/test/it"); - - v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build(); - v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build(); - v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build(); - - v1opA = client.unwrap().transactionOp().create().forPath(v1Spec.path().parent().fullPath()); - v1opB = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test")); - v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10)); - v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); - - executor = Executors.newCachedThreadPool(); - manager = new MigrationManager(client, ZPath.parse("/locks"), executor, Duration.ofMinutes(10)); - } - - @AfterMethod - @Override - public void teardown() throws Exception - { - CloseableUtils.closeQuietly(client.unwrap()); - executor.shutdownNow(); - super.teardown(); - } - - @Test - public void testBasic() throws Exception - { - Migration m1 = () -> Arrays.asList(v1opA, v1opB); - Migration m2 = () -> Collections.singletonList(v2op); - Migration m3 = () -> Collections.singletonList(v3op); - MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3)); - - complete(manager.migrate(migrationSet)); - - ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); - complete(v3Client.read(), (m, e) -> { - Assert.assertEquals(m.getAge(), 30); - Assert.assertEquals(m.getFirstName(), "One"); - Assert.assertEquals(m.getLastName(), "Two"); - }); - } - - @Test - public void testStaged() throws Exception - { - Migration m1 = () -> Arrays.asList(v1opA, v1opB); - MigrationSet migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Collections.singletonList(m1)); - complete(manager.migrate(migrationSet)); - - ModeledFramework<ModelV1> v1Client = ModeledFramework.wrap(client, v1Spec); - complete(v1Client.read(), (m, e) -> Assert.assertEquals(m.getName(), "Test")); - - Migration m2 = () -> Collections.singletonList(v2op); - migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2)); - complete(manager.migrate(migrationSet)); - - ModeledFramework<ModelV2> v2Client = ModeledFramework.wrap(client, v2Spec); - complete(v2Client.read(), (m, e) -> { - Assert.assertEquals(m.getName(), "Test 2"); - Assert.assertEquals(m.getAge(), 10); - }); - - Migration m3 = () -> Collections.singletonList(v3op); - migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3)); - complete(manager.migrate(migrationSet)); - - ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); - complete(v3Client.read(), (m, e) -> { - Assert.assertEquals(m.getAge(), 30); - Assert.assertEquals(m.getFirstName(), "One"); - Assert.assertEquals(m.getLastName(), "Two"); - }); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java deleted file mode 100644 index 02b13b7..0000000 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV1.java +++ /dev/null @@ -1,39 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations.models; - -public class ModelV1 -{ - private final String name; - - public ModelV1() - { - this(""); - } - - public ModelV1(String name) - { - this.name = name; - } - - public String getName() - { - return name; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java deleted file mode 100644 index bd77a2e..0000000 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV2.java +++ /dev/null @@ -1,46 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations.models; - -public class ModelV2 -{ - private final String name; - private final int age; - - public ModelV2() - { - this("", 0); - } - - public ModelV2(String name, int age) - { - this.name = name; - this.age = age; - } - - public String getName() - { - return name; - } - - public int getAge() - { - return age; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/d80651a7/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java deleted file mode 100644 index d4713b8..0000000 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/models/ModelV3.java +++ /dev/null @@ -1,53 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations.models; - -public class ModelV3 -{ - private final String firstName; - private final String lastName; - private final int age; - - public ModelV3() - { - this("", "", 0); - } - - public ModelV3(String firstName, String lastName, int age) - { - this.firstName = firstName; - this.lastName = lastName; - this.age = age; - } - - public String getFirstName() - { - return firstName; - } - - public String getLastName() - { - return lastName; - } - - public int getAge() - { - return age; - } -}
