reworking so that this feature is more general. Now manages any set of transactions
Project: http://git-wip-us.apache.org/repos/asf/curator/repo Commit: http://git-wip-us.apache.org/repos/asf/curator/commit/1a15582b Tree: http://git-wip-us.apache.org/repos/asf/curator/tree/1a15582b Diff: http://git-wip-us.apache.org/repos/asf/curator/diff/1a15582b Branch: refs/heads/master Commit: 1a15582b59ccfaf80b1547fd57bcb7dfe894b1c8 Parents: 1a0c162 Author: randgalt <[email protected]> Authored: Fri Jul 14 09:24:33 2017 -0500 Committer: randgalt <[email protected]> Committed: Fri Jul 14 09:24:33 2017 -0500 ---------------------------------------------------------------------- .../InvalidMigrationSetException.java | 37 ---- .../x/async/modeled/migrations/Migration.java | 14 +- .../modeled/migrations/MigrationException.java | 37 ++++ .../modeled/migrations/MigrationManager.java | 132 +++++++++++++- .../migrations/MigrationManagerBuilder.java | 68 ------- .../migrations/MigrationManagerImpl.java | 181 ------------------- .../async/modeled/migrations/MigrationSet.java | 11 +- .../migrations/TestMigrationManager.java | 35 ++-- 8 files changed, 196 insertions(+), 319 deletions(-) ---------------------------------------------------------------------- http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java deleted file mode 100644 index 84b21bf..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/InvalidMigrationSetException.java +++ /dev/null @@ -1,37 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import java.util.Objects; - -public class InvalidMigrationSetException extends RuntimeException -{ - private final String migrationId; - - public InvalidMigrationSetException(String migrationId, String message) - { - super(message); - this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null"); - } - - public String getMigrationId() - { - return migrationId; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java index b3919d1..63a7a7d 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/Migration.java @@ -18,8 +18,10 @@ */ package org.apache.curator.x.async.modeled.migrations; +import org.apache.curator.framework.api.transaction.CuratorOp; +import java.util.List; import java.util.Objects; -import java.util.function.UnaryOperator; +import java.util.function.Supplier; public interface Migration { @@ -27,12 +29,12 @@ public interface Migration int version(); - byte[] migrate(byte[] previousBytes); + List<CuratorOp> operations(); - static Migration build(String id, int version, UnaryOperator<byte[]> migrateProc) + static Migration build(String id, int version, Supplier<List<CuratorOp>> operationsProc) { Objects.requireNonNull(id, "id cannot be null"); - Objects.requireNonNull(migrateProc, "migrateProc cannot be null"); + Objects.requireNonNull(operationsProc, "operationsProc cannot be null"); return new Migration() { @Override @@ -48,9 +50,9 @@ public interface Migration } @Override - public byte[] migrate(byte[] previousBytes) + public List<CuratorOp> operations() { - return migrateProc.apply(previousBytes); + return operationsProc.get(); } }; } http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java new file mode 100644 index 0000000..1a1d59c --- /dev/null +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationException.java @@ -0,0 +1,37 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.curator.x.async.modeled.migrations; + +import java.util.Objects; + +public class MigrationException extends RuntimeException +{ + private final String migrationId; + + public MigrationException(String migrationId, String message) + { + super(message); + this.migrationId = Objects.requireNonNull(migrationId, "migrationId cannot be null"); + } + + public String getMigrationId() + { + return migrationId; + } +} http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java index 2d5f39f..47adb1e 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManager.java @@ -18,20 +18,142 @@ */ package org.apache.curator.x.async.modeled.migrations; +import com.google.common.base.Throwables; +import org.apache.curator.framework.api.transaction.CuratorOp; +import org.apache.curator.framework.recipes.locks.InterProcessLock; +import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; import org.apache.curator.x.async.AsyncCuratorFramework; import org.apache.curator.x.async.modeled.ModelSerializer; +import org.apache.curator.x.async.modeled.ModelSpec; +import org.apache.curator.x.async.modeled.ModeledFramework; +import org.apache.curator.x.async.modeled.ZNode; import org.apache.curator.x.async.modeled.ZPath; +import org.apache.zookeeper.CreateMode; +import java.time.Duration; +import java.util.ArrayList; +import java.util.Comparator; import java.util.List; +import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionStage; +import java.util.concurrent.Executor; +import java.util.concurrent.TimeUnit; +import java.util.stream.Collectors; -public interface MigrationManager +import static org.apache.curator.x.async.AsyncWrappers.*; + +public class MigrationManager { - CompletionStage<List<MetaData>> metaData(ZPath metaDataPath); + private final AsyncCuratorFramework client; + private final ZPath lockPath; + private final ModelSerializer<MetaData> metaDataSerializer; + private final Executor executor; + private final Duration lockMax; + + private static final String META_DATA_NODE_NAME = "meta-"; + + public MigrationManager(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax) + { + this.client = Objects.requireNonNull(client, "client cannot be null"); + this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); + this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null"); + this.executor = Objects.requireNonNull(executor, "executor cannot be null"); + this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); + } + + public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath) + { + ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath); + return ZNode.models(modeled.childrenAsZNodes()); + } + + public CompletionStage<Void> migrate(MigrationSet set) + { + String lockPath = this.lockPath.child(set.id()).fullPath(); + InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath); + CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor); + return lockStage.thenCompose(__ -> runMigrationInLock(lock, set)); + } + + protected List<Migration> filter(MigrationSet set, List<MetaData> sortedMetaData) throws MigrationException + { + if ( sortedMetaData.size() > set.migrations().size() ) + { + throw new MigrationException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData)); + } - CompletionStage<Void> run(); + int compareSize = Math.min(set.migrations().size(), sortedMetaData.size()); + List<MetaData> compareMigrations = set.migrations().subList(0, compareSize) + .stream() + .map(m -> new MetaData(m.id(), m.version())) + .collect(Collectors.toList()); + if ( !compareMigrations.equals(sortedMetaData) ) + { + throw new MigrationException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData)); + } + return set.migrations().subList(sortedMetaData.size(), set.migrations().size()); + } + + private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) + { + ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath()); + return modeled.childrenAsZNodes() + .thenCompose(metaData -> applyMetaData(set, modeled, metaData)) + .handle((v, e) -> { + release(lock, true); + if ( e != null ) + { + Throwables.propagate(e); + } + return v; + } + ); + } + + private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath) + { + ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build(); + return ModeledFramework.wrap(client, modelSpec); + } + + private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes) + { + List<MetaData> sortedMetaData = metaDataNodes + .stream() + .sorted(Comparator.comparing(m -> m.path().fullPath())) + .map(ZNode::model) + .collect(Collectors.toList()); + + List<Migration> toBeApplied; + try + { + toBeApplied = filter(set, sortedMetaData); + } + catch ( MigrationException e ) + { + CompletableFuture<Void> future = new CompletableFuture<>(); + future.completeExceptionally(e); + return future; + } + + if ( toBeApplied.size() == 0 ) + { + return CompletableFuture.completedFuture(null); + } + + return asyncEnsureContainers(client, metaDataClient.modelSpec().path()) + .thenCompose(__ -> applyMetaDataAfterEnsure(toBeApplied, metaDataClient)); + } - static MigrationManagerBuilder builder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer) + private CompletionStage<Void> applyMetaDataAfterEnsure(List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient) { - return new MigrationManagerBuilder(client, lockPath, metaDataSerializer); + List<CuratorOp> operations = new ArrayList<>(); + for ( Migration migration : toBeApplied ) + { + operations.addAll(migration.operations()); + MetaData thisMetaData = new MetaData(migration.id(), migration.version()); + operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData)); + } + return client.transaction().forOperations(operations).thenApply(__ -> null); } } http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java deleted file mode 100644 index ed48242..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerBuilder.java +++ /dev/null @@ -1,68 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.modeled.ModelSerializer; -import org.apache.curator.x.async.modeled.ZPath; -import java.time.Duration; -import java.util.ArrayList; -import java.util.List; -import java.util.Objects; -import java.util.concurrent.Executor; - -public class MigrationManagerBuilder -{ - private final AsyncCuratorFramework client; - private final ZPath lockPath; - private final ModelSerializer<MetaData> metaDataSerializer; - private final List<MigrationSet> sets = new ArrayList<>(); - private Executor executor = Runnable::run; - private Duration lockMax = Duration.ofSeconds(15); - - MigrationManagerBuilder(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer) - { - this.client = Objects.requireNonNull(client, "client cannot be null"); - this.lockPath = Objects.requireNonNull(lockPath, "lockPath cannot be null"); - this.metaDataSerializer = Objects.requireNonNull(metaDataSerializer, "metaDataSerializer cannot be null"); - } - - public MigrationManager build() - { - return new MigrationManagerImpl(client, lockPath, metaDataSerializer, executor, lockMax, sets); - } - - public MigrationManagerBuilder withExecutor(Executor executor) - { - this.executor = Objects.requireNonNull(executor, "executor cannot be null"); - return this; - } - - public MigrationManagerBuilder withLockMax(Duration lockMax) - { - this.lockMax = Objects.requireNonNull(lockMax, "lockMax cannot be null"); - return this; - } - - public MigrationManagerBuilder adding(MigrationSet set) - { - sets.add(Objects.requireNonNull(set, "set cannot be null")); - return this; - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java deleted file mode 100644 index 15c61b2..0000000 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationManagerImpl.java +++ /dev/null @@ -1,181 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, - * software distributed under the License is distributed on an - * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY - * KIND, either express or implied. See the License for the - * specific language governing permissions and limitations - * under the License. - */ -package org.apache.curator.x.async.modeled.migrations; - -import com.google.common.base.Throwables; -import com.google.common.collect.ImmutableList; -import org.apache.curator.framework.api.transaction.CuratorOp; -import org.apache.curator.framework.recipes.locks.InterProcessLock; -import org.apache.curator.framework.recipes.locks.InterProcessSemaphoreMutex; -import org.apache.curator.x.async.AsyncCuratorFramework; -import org.apache.curator.x.async.modeled.ModelSerializer; -import org.apache.curator.x.async.modeled.ModelSpec; -import org.apache.curator.x.async.modeled.ModeledFramework; -import org.apache.curator.x.async.modeled.ZNode; -import org.apache.curator.x.async.modeled.ZPath; -import org.apache.zookeeper.CreateMode; -import java.time.Duration; -import java.util.AbstractMap; -import java.util.ArrayList; -import java.util.Comparator; -import java.util.List; -import java.util.Map; -import java.util.concurrent.CompletableFuture; -import java.util.concurrent.CompletionStage; -import java.util.concurrent.Executor; -import java.util.concurrent.TimeUnit; -import java.util.stream.Collectors; - -import static org.apache.curator.x.async.AsyncWrappers.*; - -class MigrationManagerImpl implements MigrationManager -{ - private final AsyncCuratorFramework client; - private final ZPath lockPath; - private final ModelSerializer<MetaData> metaDataSerializer; - private final Executor executor; - private final Duration lockMax; - private final List<MigrationSet> sets; - - private static final String META_DATA_NODE_NAME = "meta-"; - - MigrationManagerImpl(AsyncCuratorFramework client, ZPath lockPath, ModelSerializer<MetaData> metaDataSerializer, Executor executor, Duration lockMax, List<MigrationSet> sets) - { - this.client = client; - this.lockPath = lockPath; - this.metaDataSerializer = metaDataSerializer; - this.executor = executor; - this.lockMax = lockMax; - this.sets = ImmutableList.copyOf(sets); - } - - @Override - public CompletionStage<List<MetaData>> metaData(ZPath metaDataPath) - { - ModeledFramework<MetaData> modeled = getMetaDataClient(metaDataPath); - return ZNode.models(modeled.childrenAsZNodes()); - } - - @Override - public CompletionStage<Void> run() - { - Map<String, CompletableFuture<Void>> futures = sets - .stream() - .map(m -> new AbstractMap.SimpleEntry<>(m.id(), runMigration(m).toCompletableFuture())) - .collect(Collectors.toMap(Map.Entry::getKey, Map.Entry::getValue)); - return CompletableFuture.allOf(futures.values().toArray(new CompletableFuture[futures.size()])); - } - - private CompletionStage<Void> runMigration(MigrationSet set) - { - String lockPath = this.lockPath.child(set.id()).fullPath(); - InterProcessLock lock = new InterProcessSemaphoreMutex(client.unwrap(), lockPath); - CompletionStage<Void> lockStage = lockAsync(lock, lockMax.toMillis(), TimeUnit.MILLISECONDS, executor); - return lockStage.thenCompose(__ -> runMigrationInLock(lock, set)); - } - - private CompletionStage<Void> runMigrationInLock(InterProcessLock lock, MigrationSet set) - { - ModeledFramework<MetaData> modeled = getMetaDataClient(set.metaDataPath()); - return modeled.childrenAsZNodes() - .thenCompose(metaData -> applyMetaData(set, modeled, metaData)) - .handle((v, e) -> { - release(lock, true); - if ( e != null ) - { - Throwables.propagate(e); - } - return v; - } - ); - } - - private ModeledFramework<MetaData> getMetaDataClient(ZPath metaDataPath) - { - ModelSpec<MetaData> modelSpec = ModelSpec.builder(metaDataPath, metaDataSerializer).withCreateMode(CreateMode.PERSISTENT_SEQUENTIAL).build(); - return ModeledFramework.wrap(client, modelSpec); - } - - protected void checkIsValid(MigrationSet set, List<MetaData> sortedMetaData) throws InvalidMigrationSetException - { - if ( sortedMetaData.size() > set.migrations().size() ) - { - throw new InvalidMigrationSetException(set.id(), String.format("More metadata than migrations. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData)); - } - - int compareSize = Math.min(set.migrations().size(), sortedMetaData.size()); - List<MetaData> compareMigrations = set.migrations().subList(0, compareSize) - .stream() - .map(m -> new MetaData(m.id(), m.version())) - .collect(Collectors.toList()); - if ( !compareMigrations.equals(sortedMetaData) ) - { - throw new InvalidMigrationSetException(set.id(), String.format("Metadata mismatch. Migration ID: %s - MetaData: %s", set.id(), sortedMetaData)); - } - } - - private CompletionStage<Void> applyMetaData(MigrationSet set, ModeledFramework<MetaData> metaDataClient, List<ZNode<MetaData>> metaDataNodes) - { - List<MetaData> sortedMetaData = metaDataNodes - .stream() - .sorted(Comparator.comparing(m -> m.path().fullPath())) - .map(ZNode::model) - .collect(Collectors.toList()); - try - { - checkIsValid(set, sortedMetaData); - } - catch ( InvalidMigrationSetException e ) - { - CompletableFuture<Void> future = new CompletableFuture<>(); - future.completeExceptionally(e); - return future; - } - - List<Migration> toBeApplied = set.migrations().subList(sortedMetaData.size(), set.migrations().size()); - if ( toBeApplied.size() == 0 ) - { - return CompletableFuture.completedFuture(null); - } - - return asyncEnsureContainers(client, metaDataClient.modelSpec().path()) - .thenCompose(__ -> applyMetaDataAfterEnsure(set, toBeApplied, metaDataClient)); - } - - private CompletionStage<Void> applyMetaDataAfterEnsure(MigrationSet set, List<Migration> toBeApplied, ModeledFramework<MetaData> metaDataClient) - { - ModelSpec<byte[]> modelSpec = ModelSpec.builder(set.path(), ModelSerializer.raw).build(); - ModeledFramework<byte[]> modeled = ModeledFramework.wrap(client, modelSpec); - return modeled.childrenAsZNodes().thenCompose(nodes -> { - List<CuratorOp> operations = new ArrayList<>(); - for ( ZNode<byte[]> node : nodes ) - { - byte[] currentBytes = node.model(); - for ( Migration migration : toBeApplied ) - { - currentBytes = migration.migrate(currentBytes); - MetaData thisMetaData = new MetaData(migration.id(), migration.version()); - operations.add(metaDataClient.child(META_DATA_NODE_NAME).createOp(thisMetaData)); - } - operations.add(modeled.child(node.path().nodeName()).updateOp(currentBytes, node.stat().getVersion())); - } - return client.transaction().forOperations(operations).thenApply(__ -> null); - }); - } -} http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java index 9e41989..0a0dbe0 100644 --- a/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java +++ b/curator-x-async/src/main/java/org/apache/curator/x/async/modeled/migrations/MigrationSet.java @@ -27,16 +27,13 @@ public interface MigrationSet { String id(); - ZPath path(); - ZPath metaDataPath(); List<Migration> migrations(); - static MigrationSet build(String id, ZPath path, ZPath metaDataPath, List<Migration> migrations) + static MigrationSet build(String id, ZPath metaDataPath, List<Migration> migrations) { Objects.requireNonNull(id, "id cannot be null"); - Objects.requireNonNull(path, "path cannot be null"); Objects.requireNonNull(metaDataPath, "metaDataPath cannot be null"); final List<Migration> migrationsCopy = ImmutableList.copyOf(migrations); return new MigrationSet() @@ -48,12 +45,6 @@ public interface MigrationSet } @Override - public ZPath path() - { - return path; - } - - @Override public ZPath metaDataPath() { return metaDataPath; http://git-wip-us.apache.org/repos/asf/curator/blob/1a15582b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java ---------------------------------------------------------------------- diff --git a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java index d709abe..daf69cd 100644 --- a/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java +++ b/curator-x-async/src/test/java/org/apache/curator/x/async/modeled/migrations/TestMigrationManager.java @@ -21,6 +21,7 @@ package org.apache.curator.x.async.modeled.migrations; import com.fasterxml.jackson.databind.ObjectMapper; import org.apache.curator.framework.CuratorFramework; import org.apache.curator.framework.CuratorFrameworkFactory; +import org.apache.curator.framework.api.transaction.CuratorOp; import org.apache.curator.retry.RetryOneTime; import org.apache.curator.utils.CloseableUtils; import org.apache.curator.x.async.AsyncCuratorFramework; @@ -37,7 +38,11 @@ import org.testng.annotations.AfterMethod; import org.testng.annotations.BeforeMethod; import org.testng.annotations.Test; import java.io.IOException; +import java.time.Duration; import java.util.Arrays; +import java.util.Collections; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; import java.util.function.UnaryOperator; public class TestMigrationManager extends CompletableBaseClassForTests @@ -47,6 +52,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests private ModelSpec<ModelV1> v1Spec; private ModelSpec<ModelV2> v2Spec; private ModelSpec<ModelV3> v3Spec; + private ExecutorService executor; @BeforeMethod @Override @@ -54,10 +60,10 @@ public class TestMigrationManager extends CompletableBaseClassForTests { super.setup(); - CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100)); - client.start(); + CuratorFramework rawClient = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(100)); + rawClient.start(); - this.client = AsyncCuratorFramework.wrap(client); + this.client = AsyncCuratorFramework.wrap(rawClient); ObjectMapper mapper = new ObjectMapper(); UnaryOperator<byte[]> from1to2 = bytes -> { @@ -89,13 +95,20 @@ public class TestMigrationManager extends CompletableBaseClassForTests ZPath modelPath = ZPath.parse("/test/it"); - Migration m1 = Migration.build("1",1, from1to2); - Migration m2 = Migration.build("2",1, from2to3); - migrationSet = MigrationSet.build("1", modelPath, ZPath.parse("/metadata"), Arrays.asList(m1, m2)); - v1Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV1.class)).build(); v2Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV2.class)).build(); v3Spec = ModelSpec.builder(modelPath, JacksonModelSerializer.build(ModelV3.class)).build(); + + CuratorOp v1op = ModeledFramework.wrap(client, v1Spec).createOp(new ModelV1("Test")); + CuratorOp v2op = ModeledFramework.wrap(client, v2Spec).updateOp(new ModelV2("Test 2", 10)); + CuratorOp v3op = ModeledFramework.wrap(client, v3Spec).updateOp(new ModelV3("One", "Two", 30)); + + Migration m1 = Migration.build("1",1, () -> Collections.singletonList(v1op)); + Migration m2 = Migration.build("2",1, () -> Collections.singletonList(v2op)); + Migration m3 = Migration.build("3",1, () -> Collections.singletonList(v3op)); + migrationSet = MigrationSet.build("1", ZPath.parse("/metadata"), Arrays.asList(m1, m2, m3)); + + executor = Executors.newCachedThreadPool(); } @AfterMethod @@ -103,6 +116,7 @@ public class TestMigrationManager extends CompletableBaseClassForTests public void teardown() throws Exception { CloseableUtils.closeQuietly(client.unwrap()); + executor.shutdownNow(); super.teardown(); } @@ -113,11 +127,8 @@ public class TestMigrationManager extends CompletableBaseClassForTests ModelV1 v1 = new ModelV1("John Galt"); complete(v1Client.child("1").set(v1)); - MigrationManager manager = MigrationManager.builder(this.client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class)) - .adding(migrationSet) - .build(); - - complete(manager.run()); + MigrationManager manager = new MigrationManager(client, ZPath.parse("/locks"), JacksonModelSerializer.build(MetaData.class), executor, Duration.ofMinutes(10)); + complete(manager.migrate(migrationSet)); ModeledFramework<ModelV3> v3Client = ModeledFramework.wrap(client, v3Spec); complete(v3Client.child("1").read(), (m, e) -> {
