This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14198 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit a2de9dd5c26636c98f8eab92f42560d210f5d980 Author: Andrey Gura <ag...@apache.org> AuthorDate: Wed Mar 3 12:59:35 2021 +0300 IGNITE-14918 Meta storage interface: batch operations, watches, conditional updates --- .../org/apache/ignite/metastorage/Example.java | 56 ++++ .../metastorage/client/MetaStorageService.java | 300 +++++++++++++++++---- .../metastorage/common/AbstractCondition.java | 77 ------ .../metastorage/common/CompactedException.java | 59 ++++ .../ignite/metastorage/common/Condition.java | 132 ++++++++- .../common/{ConditionType.java => Conditions.java} | 23 +- .../apache/ignite/metastorage/common/Entry.java | 124 +-------- ...ndition.java => OperationTimeoutException.java} | 42 +-- .../ignite/metastorage/common/PutUpdate.java | 47 ---- .../metastorage/common/RevisionCondition.java | 48 ---- .../apache/ignite/metastorage/common/Update.java | 35 ++- .../common/{RemoveUpdate.java => Updates.java} | 28 +- 12 files changed, 570 insertions(+), 401 deletions(-) diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java new file mode 100644 index 0000000..484ed3e --- /dev/null +++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/Example.java @@ -0,0 +1,56 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.ignite.metastorage; + +import org.apache.ignite.metastorage.client.MetaStorageService; + +import static org.apache.ignite.metastorage.common.Conditions.revision; +import static org.apache.ignite.metastorage.common.Conditions.value; +import static org.apache.ignite.metastorage.common.Updates.noop; +import static org.apache.ignite.metastorage.common.Updates.put; +import static org.apache.ignite.metastorage.common.Updates.remove; + +public class Example { + + /** + * Usage of conditional update example. + * @param args + */ + @SuppressWarnings("ConstantConditions") + public static void main(String[] args) { + MetaStorageService srv = getMetaStorageService(); + + byte[] key = "key".getBytes(); + byte[] val = "val".getBytes(); + byte[] newVal = "newVal".getBytes(); + + srv.update(key, revision().less(10), put(val), remove()); + srv.update(key, value().equal(newVal), put(val), noop()); + + // Soon... + // srv.update(revision(key).less(10), put(key, val), remove(key)); + // srv.update(value(key).equal(newVal), put(key, val), noop()); + + } + + /** + * @return Meta storage service. + */ + public static MetaStorageService getMetaStorageService() { + return null; + } +} diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java index 178f633..c18f481 100644 --- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java +++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java @@ -18,138 +18,326 @@ package org.apache.ignite.metastorage.client; +import org.apache.ignite.metastorage.common.CompactedException; import org.apache.ignite.metastorage.common.Condition; import org.apache.ignite.metastorage.common.Entry; +import org.apache.ignite.metastorage.common.OperationTimeoutException; import org.apache.ignite.metastorage.common.Update; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.Nullable; +import java.util.Collection; import java.util.List; -import java.util.concurrent.Future; +import java.util.UUID; +import java.util.concurrent.CompletableFuture; import java.util.function.BiConsumer; import java.util.function.Consumer; +import java.util.stream.Stream; /** * Defines interface for access to a metastorage service. */ public interface MetaStorageService { /** - * Returns metastorage revision. + * Retrieves an entry for the given key. * - * @return Metastorage revision. + * @param key Key. Couldn't be {@code null}. + * @return An entry for the given key. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry */ @NotNull - Future<Long> revision(); + CompletableFuture<Entry> get(@NotNull byte[] key); /** - * Updates entry with given key and value. + * Retrieves an entry for the given key and the revision upper bound. * - * @param key Key. Couldn't be {@code null}. - * @param value Value.Couldn't be {@code null}. - * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}. + * @param key The key. Couldn't be {@code null}. + * @param revUpperBound The upper bound for entry revisions. Must be positive. + * @return An entry for the given key and maximum revision limited by {@code revUpperBound}. + * Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * Will be thrown on getting future result. * @see Entry */ @NotNull - Future<Entry> put(@NotNull byte[] key, @NotNull byte[] value); + CompletableFuture<Entry> get(@NotNull byte[] key, long revUpperBound); /** - * Retrieves entry with a given key. + * Retrieves entries for given keys. * - * @param key Key. Couldn't be {@code null}. - * @return An entry for given key or an empty/tombstone entry. Couldn't be {@code null}. + * @param keys The collection of keys. Couldn't be {@code null} or empty. + * Collection elements couldn't be {@code null}. + * @return A list of entries for given keys. The order of entries in the result list corresponds to + * the traversal order of {@code keys} collection. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see Entry */ @NotNull - Future<Entry> get(@NotNull byte[] key); + CompletableFuture<List<Entry>> getAll(Collection<byte[]> keys); /** - * Retrieves entry with a given key and a revision. + * Retrieves entries for given keys and the revision upper bound. * - * @param key Key. Couldn't be {@code null}. - * @param rev Revision. Must be positive. - * @return An entry for given key and a revision or an empty/tombstone entry. Couldn't be {@code null}. + * @param keys The collection of keys. Couldn't be {@code null} or empty. + * Collection elements couldn't be {@code null}. + * @param revUpperBound The upper bound for entry revisions. Must be positive. + * @return A list of entries for given keys and maximum revision limited by {@code revUpperBound}. + * The order of entries in the result list corresponds to the traversal order of {@code keys} collection. + * Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * Will be thrown on getting future result. * @see Entry */ - //TODO: Is it really needed??? @NotNull - Future<Entry> get(@NotNull byte[] key, long rev); + CompletableFuture<List<Entry>> getAll(Collection<byte[]> keys, long revUpperBound); /** - * Removes an entry with a given key. + * Inserts or updates an entry with the given key and the given value. * - * @param key Key. Couldn't be {@code null}. - * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}. + * @param key The key. Couldn't be {@code null}. + * @param value The value.Couldn't be {@code null}. + * @return Completed future. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see Entry */ @NotNull - Future<Entry> remove(@NotNull byte[] key); + CompletableFuture<Void> put(@NotNull byte[] key, @NotNull byte[] value); /** - * Updates entry conditionally. + * Inserts or updates an entry with the given key and the given value and + * retrieves a previous entry for the given key. * - * @param key Key. Couldn't be {@code null}. - * @param condition Condition. - * @param success Update which will be applied in case of condition success. - * @param failure Update which will be applied in case of condition failure. - * @return A previous entry which could be regular, empty or tombstone. Couldn't be {@code null}. + * @param key The key. Couldn't be {@code null}. + * @param value The value.Couldn't be {@code null}. + * @return A previous entry for the given key. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<Entry> getAndPut(@NotNull byte[] key, @NotNull byte[] value); + + /** + * Inserts or updates entries with given keys and given values. + * Size of {@code keys} and {@code values} must be the same. + * + * @param keys The list of keys. Couldn't be {@code null} or empty. + * @param values The list of values corresponding to the list of keys. Couldn't be {@code null} or empty. + * @return Completed future. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<Void> putAll(@NotNull List<byte[]> keys, @NotNull List<byte[]> values); + + /** + * Inserts or updates entries with given keys and given values and + * retrieves a previous entries for given keys. + * Size of {@code keys} and {@code values} must be the same. + * + * @param keys The list of keys. Couldn't be {@code null} or empty. + * @param values The list of values corresponding to the list of keys. Couldn't be {@code null} or empty. + * @return A list of entries for given keys. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<List<Entry>> getAndPutAll(@NotNull List<byte[]> keys, @NotNull List<byte[]> values); + + /** + * Removes an entry for the given key. + * + * @param key The key. Couldn't be {@code null}. + * @return Completed future. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<Void> remove(@NotNull byte[] key); + + /** + * Removes an entry for the given key. + * + * @param key The key. Couldn't be {@code null}. + * @return A previous entry for the given key. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<Void> getAndRemove(@NotNull byte[] key); + + /** + * Removes entries for given keys. + * + * @param key The key. Couldn't be {@code null}. + * @return Completed future. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<Void> removeAll(@NotNull Collection<byte[]> key); + + /** + * Removes entries for given keys and retrieves previous entries. + * + * @param key The key. Couldn't be {@code null}. + * @return A list of previous entries for given keys.. + * The order of entries in the result list corresponds to the traversal order of {@code keys} collection. + * Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Entry + */ + @NotNull + CompletableFuture<List<Entry>> getAndRemoveAll(@NotNull Collection<byte[]> key); + + + /** + * Updates an entry for the given key conditionally. + * + * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> + * + * @param key The key. Couldn't be {@code null}. + * @param condition The condition. + * @param success The update which will be applied in case of condition evaluation yields {@code true}. + * @param failure The update which will be applied in case of condition evaluation yields {@code false}. + * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see Entry + * @see Condition + * @see Update */ - Future<Entry> update(@NotNull byte[] key, Condition condition, Update success, Update failure); + // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. + @NotNull + CompletableFuture<Boolean> update(@NotNull byte[] key, @NotNull Condition condition, + @NotNull Update success, @NotNull Update failure); + /** - * Updates multiple entries conditionally. + * Updates an entry for the given key conditionally. + * + * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> * - * @param keys List of keys. - * @param condition List of conditions corresponding to keys list. - * @param success List of updates which will be applied to corresponding key in case of condition success. - * @param failure List of updates which will be applied to corresponding key in case of condition failure. - * @return A List of previous entries corresponding to list of keys, where each entry could be regular, - * empty or tombstone. Couldn't be {@code null}. + * @param key The key. Couldn't be {@code null}. + * @param condition The condition. + * @param success The update which will be applied in case of condition evaluation yields {@code true}. + * @param failure The update which will be applied in case of condition evaluation yields {@code false}. + * @return A previous entry for the given key. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see Entry + * @see Condition + * @see Update */ - // TODO: If I understand correctly, we always use one success and one failure condition for each key in transaction. May be I'm wrong. - // TODO: Probably, we should provide no-op conditions also (e.g. for implementation if-then only logic). - Future<List<Entry>> update(List<byte[]> keys, List<Condition> condition, List<Update> success, List<Update> failure); + // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. + @NotNull + CompletableFuture<Entry> getAndUpdate(@NotNull byte[] key, @NotNull Condition condition, + @NotNull Update success, @NotNull Update failure); /** - * Retrieves entries for a given key range in lexicographic order. - * Only entries with the latest revisions will be returned. + * Retrieves entries for the given key range in lexicographic order. Entries will be filtered out by upper bound + * of given revision number. * * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. * @param keyTo End key of range (exclusive). Could be {@code null}. - * @param consumer Entry consumer which will be invoked for each entry. Entry couldn't be {@code null}. - * @return Future which will be completed when iteration will be finished. Couldn't be {@code null}. + * @param revUpperBound The upper bound for entry revision. {@code -1} means latest revision. + * @return Stream of entries corresponding to the given range and revision. + * @throws OperationTimeoutException If the operation is timed out. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * @see Entry */ @NotNull - Future<Void> iterate(@NotNull byte[] keyFrom, @Nullable byte[] keyTo, @NotNull Consumer<Entry> consumer); + Stream<Entry> range(@NotNull byte[] keyFrom, @Nullable byte[] keyTo, long revUpperBound); /** - * Creates watcher on metastorage updates with given parameters. + * Retrieves entries for the given key range in lexicographic order. Short cut for + * {@link #range(byte[], byte[], long)} where {@code revUpperBound == -1}. + * + * @param keyFrom Start key of range (inclusive). Couldn't be {@code null}. + * @param keyTo End key of range (exclusive). Could be {@code null}. + * @return Stream of entries corresponding to the given range and revision. + * @throws OperationTimeoutException If the operation is timed out. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * @see Entry + */ + @NotNull + Stream<Entry> range(@NotNull byte[] keyFrom, @Nullable byte[] keyTo); + + /** + * Subscribes on meta storage updates matching the parameters. * * @param keyFrom Start key of range (inclusive). Could be {@code null}. * @param keyTo End key of range (exclusive). Could be {@code null}. - * @param revision Start revision. - * @param consumer Entry consumer which will be invoked for each update. Entry couldn't be {@code null}. - * @return Watch identifier. + * @param revision Start revision inclusive. {@code 0} - all revision, + * {@code -1} - latest revision (accordingly to current meta storage state). + * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}. + * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * Will be thrown on getting future result. + * @throws WatchTerminatedException + * @see Entry */ @NotNull - Future<Long> watch(@Nullable byte[] keyFrom, @Nullable byte[] keyTo, long revision, - @NotNull BiConsumer<Entry, Entry> consumer); + //TODO: Q: WatchTerminatedException??? + CompletableFuture<UUID> watch(@Nullable byte[] keyFrom, @Nullable byte[] keyTo, long revision, + @NotNull BiConsumer<List<Entry>, List<Entry>> consumer); + + /** + * Subscribes on meta storage updates for the given key. + * + * @param key The target key. Could be {@code null}. + * @param revision Start revision inclusive. {@code 0} - all revision, + * {@code -1} - latest revision (accordingly to current meta storage state). + * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}. + * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * Will be thrown on getting future result. + * @throws WatchTerminatedException + * @see Entry + */ + @NotNull + //TODO: Q: WatchTerminatedException??? + CompletableFuture<UUID> watch(@NotNull byte[] key, long revision, + @NotNull Consumer<Entry> consumer); + + /** + * Subscribes on meta storage updates for given keys. + * + * @param keys Collection of target keys. Could be {@code null}. + * @param revision Start revision inclusive. {@code 0} - all revision, + * {@code -1} - latest revision (accordingly to current meta storage state). + * @param consumer Entries consumer which will be invoked for each update. Entry couldn't be {@code null}. + * @return Subscription identifier. Could be used in {@link #stopWatch} method in order to cancel subscription. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @throws CompactedException If the desired revisions are removed from the storage due to a compaction. + * Will be thrown on getting future result. + * @throws WatchTerminatedException + * @see Entry + */ + @NotNull + //TODO: Q: WatchTerminatedException??? + CompletableFuture<UUID> watch(@NotNull Collection<byte[]> keys, long revision, + @NotNull Consumer<List<Entry>> consumer); + /** - * Stops watch with a given identifier. + * Cancels subscription for the given identifier. * - * @param watchId Watch identifier. + * @param id Subscription identifier. * @return Completed future in case of operation success. Couldn't be {@code null}. + * * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. */ @NotNull - Future<Void> stopWatch(long watchId); + CompletableFuture<Void> stopWatch(@NotNull UUID id); /** - * Compacts metastorage (removes all tombstone entries and old entries except of entries with latest revision). + * Compacts meta storage (removes all tombstone entries and old entries except of entries with latest revision). * * @return Completed future. Couldn't be {@code null}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. */ - Future<Void> compact(); + CompletableFuture<Void> compact(); } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java deleted file mode 100644 index 9bfb570..0000000 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/AbstractCondition.java +++ /dev/null @@ -1,77 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.metastorage.common; - -import static org.apache.ignite.metastorage.common.ConditionType.EQUAL; -import static org.apache.ignite.metastorage.common.ConditionType.GREATER; -import static org.apache.ignite.metastorage.common.ConditionType.LESS; -import static org.apache.ignite.metastorage.common.ConditionType.NOT_EQUAL; - -/** - * Abstract condition which is designed for a comparison based conditions. - * - * <p>The comparison type is defined by {@link ConditionType} enumeration. Only {@link #compare(Entry)} method - * should be implemented for getting correct comparison based condition evaluation.</p> - */ -public abstract class AbstractCondition implements Condition { - /** Condition type. */ - private final ConditionType type; - - /** - * Constructor. - * - * @param type Condition type. - */ - public AbstractCondition(ConditionType type) { - this.type = type; - } - - /** - * Returns condition type for this condition. - * - * @return Condition type. - */ - protected ConditionType conditionType() { - return type; - } - - /** - * Evaluates comparison based condition. - * - * @param e Entry which is a subject of conditional update. - * @return {@code True} if condition is successful, otherwise - {@code false}. - */ - @Override public boolean eval(Entry e) { - int res = compare(e); - - ConditionType type = conditionType(); - - return (type == EQUAL && res == 0) || - (type == NOT_EQUAL && res != 0) || - (type == LESS && res < 0) || - (type == GREATER && res > 0); - } - - /** - * This abstract method should implement comparison logic based on {@link java.util.Comparator} contract. - * - * @param e Entry. - * @return Comparison result as defined {@link java.util.Comparator} contract. - */ - abstract protected int compare(Entry e); -} diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java new file mode 100644 index 0000000..6ea9f43 --- /dev/null +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/CompactedException.java @@ -0,0 +1,59 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one or more + * contributor license agreements. See the NOTICE file distributed with + * this work for additional information regarding copyright ownership. + * The ASF licenses this file to You under the Apache License, Version 2.0 + * (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.ignite.metastorage.common; + +/** + * Thrown when a requested operation on meta storage could not be performed because target revisions were removed + * from storage due to a compaction procedure. In such case the operation should be retried with actual revision. + */ +public class CompactedException extends RuntimeException { + /** + * Constructs an exception. + */ + public CompactedException() { + super(); + } + + /** + * Constructs an exception with a given message. + * + * @param message Detail message. + */ + public CompactedException(String message) { + super(message); + } + + /** + * Constructs an exception with a given message and a cause. + * + * @param message Detail message. + * @param cause Cause. + */ + public CompactedException(String message, Throwable cause) { + super(message, cause); + } + + /** + * Constructs an exception with a given cause. + * + * @param cause Cause. + */ + public CompactedException(Throwable cause) { + super(cause); + } +} diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java index e1a81ed..6fa3758 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java @@ -17,15 +17,129 @@ package org.apache.ignite.metastorage.common; +import java.util.Arrays; + /** - * Defines condition interface for metastorage conditional update. + * Represents condition for conditional update. */ -public interface Condition { - /** - * Should implements logic for condition evaluation. - * - * @param e Entry which is a subject of conditional update. - * @return {@code True} if condition is successful, otherwise - {@code false}. - */ - boolean eval(Entry e); +public final class Condition { + private final InnerCondition cnd; + + Condition(InnerCondition cnd) { + this.cnd = cnd; + } + + public boolean test(Entry e) { + return cnd.test(e); + } + + public static final class RevisionCondition implements InnerCondition { + private Type type; + private long rev; + + RevisionCondition() { + // No-op. + } + + public Condition equal(long rev) { + validate(); + + this.type = Type.EQUAL; + this.rev = rev; + + return new Condition(this); + } + + public Condition notEqual(long rev) { + validate(); + + this.type = Type.NOT_EQUAL; + this.rev = rev; + + return new Condition(this); + } + + public Condition greater(long rev) { + validate(); + + this.type = Type.GREATER; + this.rev = rev; + + return new Condition(this); + } + + public Condition less(long rev) { + validate(); + + this.type = Type.LESS; + this.rev = rev; + + return new Condition(this); + } + + @Override public boolean test(Entry e) { + int res = Long.compare(e.revision(), rev); + + return (type == Type.EQUAL && res == 0) || + (type == Type.NOT_EQUAL && res != 0) || + (type == Type.GREATER && res > 0) || + (type == Type.LESS && res < 0); + } + + private void validate() { + if (type != null) + throw new IllegalStateException("Condition type " + type.name() + " is already defined."); + } + + enum Type { + EQUAL, NOT_EQUAL, GREATER, LESS; + } + } + + public static final class ValueCondition implements InnerCondition { + private Type type; + private byte[] val; + + ValueCondition() { + // No-op. + } + + public Condition equal(byte[] val) { + validate(); + + this.type = Type.EQUAL; + this.val = val; + + return new Condition(this); + } + + public Condition notEqual(byte[] val) { + validate(); + + this.type = Type.NOT_EQUAL; + this.val = val; + + return new Condition(this); + } + + @Override public boolean test(Entry e) { + int res = Arrays.compare(e.value(), val); + + return (type == Type.EQUAL && res == 0) || + (type == Type.NOT_EQUAL && res != 0); + } + + private void validate() { + if (type != null) + throw new IllegalStateException("Condition type " + type.name() + " is already defined."); + } + + enum Type { + EQUAL, NOT_EQUAL + } + } + + private interface InnerCondition { + boolean test(Entry e); + } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java similarity index 73% rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java index 16383d2..08e5326 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ConditionType.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java @@ -17,19 +17,16 @@ package org.apache.ignite.metastorage.common; -/** - * Defines available condition types. - */ -enum ConditionType { - /** Equal to smth. */ - EQUAL, - - /** Not equal to smth. */ - NOT_EQUAL, +public final class Conditions { + public static Condition.RevisionCondition revision() { + return new Condition.RevisionCondition(); + } - /** Greater than smth. */ - GREATER, + public static Condition.ValueCondition value() { + return new Condition.ValueCondition(); + } - /** Less than smth. */ - LESS + private Conditions() { + // No-op. + } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java index 719809f..b01128e 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Entry.java @@ -27,136 +27,26 @@ import org.jetbrains.annotations.Nullable; * <ul>value - a data which is associated with a key and represented as an array of bytes.</ul> * <ul>revision - a number which denotes a version of whole meta storage. Each change increments the revision.</ul> * </ul> - * - * Instance of {@link #Entry} could represents: - * <ul> - * <li>A regular entry which stores a particular key, a value and a revision number.</li> - * <li>An empty entry which denotes absence a regular entry in the meta storage for a given key. - * A revision is 0 for such kind of entry.</li> - * <li>A tombstone entry which denotes that a regular entry for a given key was removed from storage on some revision.</li> - * </ul> */ -public class Entry { - /** Entry key. Couldn't be {@code null}. */ - @NotNull - final private byte[] key; - - /** - * Entry value. - * <p> - * {@code val == null} only for {@link #empty()} and {@link #tombstone()} entries. - * </p> - */ - @Nullable - final private byte[] val; - - /** - * Revision number corresponding to this particular entry. - * <p> - * {@code rev == 0} for {@link #empty()} entry, - * {@code rev > 0} for regular and {@link #tombstone()} entries. - * </p> - */ - final private long rev; - - /** - * Constructor. - * - * @param key Key bytes. Couldn't be {@code null}. - * @param val Value bytes. Couldn't be {@code null}. - * @param rev Revision. - */ - // TODO: It seems user will never create Entry, so we can reduce constructor scope to protected or package-private and reuse it from two-place private constructor. - public Entry(@NotNull byte[] key, @NotNull byte[] val, long rev) { - assert key != null : "key can't be null"; - assert val != null : "value can't be null"; - - this.key = key; - this.val = val; - this.rev = rev; - } - - /** - * Constructor for empty and tombstone entries. - * - * @param key Key bytes. Couldn't be {@code null}. - * @param rev Revision. - */ - private Entry(@NotNull byte[] key, long rev) { - assert key != null : "key can't be null"; - - this.key = key; - this.val = null; - this.rev = rev; - } - - /** - * Creates an instance of empty entry for a given key. - * - * @param key Key bytes. Couldn't be {@code null}. - * @return Empty entry. - */ - @NotNull - public static Entry empty(byte[] key) { - return new Entry(key, 0); - } - - /** - * Creates an instance of tombstone entry for a given key and a revision. - * - * @param key Key bytes. Couldn't be {@code null}. - * @return Empty entry. - */ - @NotNull - public static Entry tombstone(byte[] key, long rev) { - assert rev > 0 : "rev must be positive for tombstone entry."; - - return new Entry(key, rev); - } - +public interface Entry { /** * Returns a key. * - * @return Key. + * @return The key. */ - @NotNull - public byte[] key() { - return key; - } + @NotNull byte[] key(); /** - * Returns a value. + * Returns a value. Could be {@code null} for empty entry. * * @return Value. */ - @Nullable - public byte[] value() { - return val; - } + @Nullable byte[] value(); /** * Returns a revision. - * @return Revision. - */ - public long revision() { - return rev; - } - - /** - * Returns value which denotes whether entry is tombstone or not. * - * @return {@code True} if entry is tombstone, otherwise - {@code false}. - */ - public boolean tombstone() { - return val == null && rev > 0; - } - - /** - * Returns value which denotes whether entry is empty or not. - * - * @return {@code True} if entry is empty, otherwise - {@code false}. + * @return Revision. */ - public boolean empty() { - return val == null && rev == 0; - } + long revision(); } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java similarity index 50% rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java index d437c87..4edcf02 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/ValueCondition.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/OperationTimeoutException.java @@ -18,32 +18,42 @@ package org.apache.ignite.metastorage.common; /** - * Condition, intended for value evaluation. + * Thrown when an operation is not executed within a specified time period. Usually in such cases the operation + * should be retried. */ -public class ValueCondition extends AbstractCondition { - /** Value. */ - private final byte[] val; +public class OperationTimeoutException extends RuntimeException { + /** + * Constructs an exception. + */ + public OperationTimeoutException() { + super(); + } /** - * Constructor. + * Constructs an exception with a given message. * - * @param type Condition type. - * @param val Value for comparison. + * @param message Detail message. */ - public ValueCondition(ConditionType type, byte[] val) { - super(type); + public OperationTimeoutException(String message) { + super(message); + } - this.val = val; + /** + * Constructs an exception with a given message and a cause. + * + * @param message Detail message. + * @param cause Cause. + */ + public OperationTimeoutException(String message, Throwable cause) { + super(message, cause); } /** - * Compares a given value with a target one in lexicographical manner. + * Constructs an exception with a given cause. * - * @param e A target entry. - * @return Comparison result as defined {@link java.util.Comparator} contract. + * @param cause Cause. */ - // TODO: Actually, value could be compared in different manners. So we should have possibility to define comparison logic. - @Override protected int compare(Entry e) { - return LexicographicComparator.INSTANCE.compare(e.value(), val); + public OperationTimeoutException(Throwable cause) { + super(cause); } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java deleted file mode 100644 index d1012df..0000000 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/PutUpdate.java +++ /dev/null @@ -1,47 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.metastorage.common; - -import org.jetbrains.annotations.NotNull; - -/** - * Put (write) update. - */ -public class PutUpdate implements Update { - /** Key. */ - @NotNull - private final byte[] key; - - /** Value. */ - @NotNull - private final byte[] val; - - /** - * Constructor. - * - * @param key A target key which will be updated. - * @param val A target value which will be written for a given key. - */ - public PutUpdate(@NotNull byte[] key, @NotNull byte[] val) { - assert key != null : "key can't be null"; - assert val != null : "value can't be null"; - - this.key = key; - this.val = val; - } -} diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java deleted file mode 100644 index cd08c9c..0000000 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RevisionCondition.java +++ /dev/null @@ -1,48 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one or more - * contributor license agreements. See the NOTICE file distributed with - * this work for additional information regarding copyright ownership. - * The ASF licenses this file to You under the Apache License, Version 2.0 - * (the "License"); you may not use this file except in compliance with - * the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.ignite.metastorage.common; - -/** - * Condition, intended for revision evaluation. - */ -public class RevisionCondition extends AbstractCondition { - /** Revision. */ - private final long rev; - - /** - * Constructor. - * - * @param type Condition type. - * @param rev Revision for comparison. - */ - public RevisionCondition(ConditionType type, long rev) { - super(type); - - this.rev = rev; - } - - /** - * Compares a given revision with a target one. - * - * @param e A target entry. - * @return Comparison result as defined {@link java.util.Comparator} contract. - */ - @Override protected int compare(Entry e) { - return Long.compare(e.revision(), rev); - } -} diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java index 58b211c..d5a6b46 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Update.java @@ -18,7 +18,36 @@ package org.apache.ignite.metastorage.common; /** - * Defines update command for metastorage conditional update. + * Defines update command for meta storage conditional update. */ -public interface Update { -} +public final class Update { + private final InnerUpdate upd; + + Update(InnerUpdate upd) { + this.upd = upd; + } + + public static final class RemoveUpdate implements InnerUpdate { + RemoveUpdate() { + // No-op. + } + } + + public static final class PutUpdate implements InnerUpdate { + private final byte[] val; + + PutUpdate(byte[] val) { + this.val = val; + } + } + + public static final class NoOpUpdate implements InnerUpdate { + NoOpUpdate() { + // No-op. + } + } + + private interface InnerUpdate { + // Marker interface. + } +} \ No newline at end of file diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java similarity index 70% rename from modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java rename to modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java index 09e227c..afa7cbe 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/RemoveUpdate.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Updates.java @@ -17,22 +17,20 @@ package org.apache.ignite.metastorage.common; -import org.jetbrains.annotations.NotNull; +public final class Updates { + public static Update remove() { + return new Update(new Update.RemoveUpdate()); + } -/** - * Remove update. - */ -public class RemoveUpdate implements Update { - /** Key. */ - @NotNull - private final byte[] key; + public static Update put(byte[] value) { + return new Update(new Update.PutUpdate(value)); + } + + public static Update noop() { + return new Update(new Update.NoOpUpdate()); + } - /** - * Constructor. - * - * @param key A target key which will be removed. - */ - public RemoveUpdate(@NotNull byte[] key) { - this.key = key; + private Updates() { + // No-op. } }