This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14389 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 73176e2350a47f77e4ea8f086746e7bfca2bf28d Author: Andrey Gura <[email protected]> AuthorDate: Tue May 4 18:37:10 2021 +0300 IGNITE-14389 Implemented conditional update (invoke) --- .../ignite/internal/affinity/AffinityManager.java | 8 +- .../metastorage/client/MetaStorageService.java | 27 +- .../ignite/metastorage/common/Condition.java | 133 +++------ .../ignite/metastorage/common/Conditions.java | 10 +- .../ignite/metastorage/common/Operation.java | 35 ++- .../ignite/metastorage/common/Operations.java | 10 +- modules/metastorage-server/pom.xml | 6 +- .../metastorage/server/AbstractCondition.java | 13 + .../internal/metastorage/server/Condition.java | 7 + .../metastorage/server/KeyValueStorage.java | 2 + .../internal/metastorage/server/Operation.java | 31 ++ .../metastorage/server/RevisionCondition.java | 75 +++++ .../server/SimpleInMemoryKeyValueStorage.java | 95 +++++- .../metastorage/server/ValueCondition.java | 49 ++++ .../metastorage/server/RevisionConditionTest.java | 65 +++++ .../server/SimpleInMemoryKeyValueStorageTest.java | 323 +++++++++++++++++++++ .../metastorage/server/ValueConditionTest.java | 27 ++ .../internal/metastorage/MetaStorageManager.java | 10 +- .../apache/ignite/internal/app/IgnitionImpl.java | 51 ++-- .../internal/table/distributed/TableManager.java | 8 +- 20 files changed, 817 insertions(+), 168 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java index 937af33..61cf7c5 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java @@ -149,9 +149,11 @@ public class AffinityManager { int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) .tables().get(name).replicas().value(); - metaStorageMgr.invoke(evt.newEntry().key(), - Conditions.value().eq(evt.newEntry().value()), - Operations.put(ByteUtils.toBytes( + Key key = evt.newEntry().key(); + + metaStorageMgr.invoke( + Conditions.value(key).eq(evt.newEntry().value()), + Operations.put(key, ByteUtils.toBytes( RendezvousAffinityFunction.assignPartitions( baselineMgr.nodes(), partitions, diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java index d4ded44..409f90d 100644 --- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java +++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java @@ -202,7 +202,6 @@ public interface MetaStorageService { * * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> * - * @param key The key. Couldn't be {@code null}. * @param condition The condition. * @param success The update which will be applied in case of condition evaluation yields {@code true}. * @param failure The update which will be applied in case of condition evaluation yields {@code false}. @@ -215,7 +214,7 @@ public interface MetaStorageService { */ // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. @NotNull - CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition, + CompletableFuture<Boolean> invoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure); /** @@ -223,7 +222,27 @@ public interface MetaStorageService { * * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> * - * @param key The key. Couldn't be {@code null}. + * @param condition The condition. + * @param success The updates which will be applied in case of condition evaluation yields {@code true}. + * @param failure The updates which will be applied in case of condition evaluation yields {@code false}. + * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Key + * @see Entry + * @see Condition + * @see Operation + */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. + @NotNull + CompletableFuture<Boolean> invoke(@NotNull Condition condition, + @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure); + + + /** + * Updates an entry for the given key conditionally. + * + * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> + * * @param condition The condition. * @param success The update which will be applied in case of condition evaluation yields {@code true}. * @param failure The update which will be applied in case of condition evaluation yields {@code false}. @@ -236,7 +255,7 @@ public interface MetaStorageService { */ //TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. @NotNull - CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition, + CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure); /** diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java index 359829b..f7ab099 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java @@ -17,8 +17,6 @@ package org.apache.ignite.metastorage.common; -import java.util.Arrays; - /** * Represents a condition for conditional update. */ @@ -36,22 +34,11 @@ public final class Condition { } /** - * Tests the given entry on satisfaction of the condition. - * - * @param e Entry. - * @return The result of condition test. {@code true} - if the entry satisfies to the condition, - * otherwise - {@code false}. - */ - public boolean test(Entry e) { - return cond.test(e); - } - - /** * Represents condition on entry revision. Only one type of condition could be applied to * the one instance of condition. Subsequent invocations of any method which produces condition will throw * {@link IllegalStateException}. */ - public static final class RevisionCondition implements InnerCondition { + public static final class RevisionCondition extends AbstractCondition { /** * The type of condition. * @@ -65,10 +52,12 @@ public final class Condition { private long rev; /** - * Default no-op constructor. + * Constructs a condition by a revision for an entry identified by the given key. + * + * @param key Identifies an entry which condition will be applied to. */ - RevisionCondition() { - // No-op. + RevisionCondition(byte[] key) { + super(key); } /** @@ -173,66 +162,27 @@ public final class Condition { return new Condition(this); } - /** {@inheritDoc} */ - @Override public boolean test(Entry e) { - int res = Long.compare(e.revision(), rev); - - return type.test(res); - } - /** * Defines possible condition types which can be applied to the revision. */ enum Type { /** Equality condition type. */ - EQUAL { - @Override public boolean test(long res) { - return res == 0; - } - }, + EQUAL, /** Inequality condition type. */ - NOT_EQUAL { - @Override public boolean test(long res) { - return res != 0; - } - }, + NOT_EQUAL, /** Greater than condition type. */ - GREATER { - @Override public boolean test(long res) { - return res > 0; - } - }, + GREATER, /** Less than condition type. */ - LESS { - @Override public boolean test(long res) { - return res < 0; - } - }, + LESS, /** Less than or equal to condition type. */ - LESS_OR_EQUAL { - @Override public boolean test(long res) { - return res <= 0; - } - }, + LESS_OR_EQUAL, /** Greater than or equal to condition type. */ - GREATER_OR_EQUAL { - @Override public boolean test(long res) { - return res >= 0; - } - }; - - /** - * Interprets comparison result. - * - * @param res The result of comparison. - * @return The interpretation of the comparison result. - */ - public abstract boolean test(long res); + GREATER_OR_EQUAL } } @@ -241,7 +191,7 @@ public final class Condition { * the one instance of condition. Subsequent invocations of any method which produces condition will throw * {@link IllegalStateException}. */ - public static final class ValueCondition implements InnerCondition { + public static final class ValueCondition extends AbstractCondition { /** * The type of condition. * @@ -255,10 +205,12 @@ public final class Condition { private byte[] val; /** - * Default no-op constructor. + * Constructs a condition by a value for an entry identified by the given key. + * + * @param key Identifies an entry which condition will be applied to. */ - ValueCondition() { - // No-op. + ValueCondition(byte[] key) { + super(key); } /** @@ -295,38 +247,15 @@ public final class Condition { return new Condition(this); } - /** {@inheritDoc} */ - @Override public boolean test(Entry e) { - int res = Arrays.compare(e.value(), val); - - return type.test(res); - } - /** * Defines possible condition types which can be applied to the value. */ enum Type { /** Equality condition type. */ - EQUAL { - @Override public boolean test(long res) { - return res == 0; - } - }, + EQUAL, /** Inequality condition type. */ - NOT_EQUAL { - @Override public boolean test(long res) { - return res != 0; - } - }; - - /** - * Interprets comparison result. - * - * @param res The result of comparison. - * @return The interpretation of the comparison result. - */ - public abstract boolean test(long res); + NOT_EQUAL } } @@ -343,14 +272,24 @@ public final class Condition { /** * Defines condition interface. */ - private interface InnerCondition { + public interface InnerCondition { /** - * Tests the given entry on satisfaction of the condition. + * Returns key which identifies an entry which condition will be applied to. * - * @param e Entry. - * @return The result of condition test. {@code true} - if the entry satisfies to the condition, - * otherwise - {@code false}. + * @return Key which identifies an entry which condition will be applied to. */ - boolean test(Entry e); + byte[] key(); + } + + private static abstract class AbstractCondition implements InnerCondition { + private final byte[] key; + + public AbstractCondition(byte[] key) { + this.key = key; + } + + @Override public byte[] key() { + return key; + } } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java index f83849a..567b20b 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java @@ -27,21 +27,23 @@ public final class Conditions { /** * Creates condition on entry revision. * + * @param key Identifies an entry which condition will be applied to. * @return Condition on entry revision. * @see Condition.RevisionCondition */ - public static Condition.RevisionCondition revision() { - return new Condition.RevisionCondition(); + public static Condition.RevisionCondition revision(Key key) { + return new Condition.RevisionCondition(key.bytes()); } /** * Creates condition on entry value. * + * @param key Identifies an entry which condition will be applied to. * @return Condition on entry value. * @see Condition.ValueCondition */ - public static Condition.ValueCondition value() { - return new Condition.ValueCondition(); + public static Condition.ValueCondition value(Key key) { + return new Condition.ValueCondition(key.bytes()); } /** diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java index e085a28..57329f4 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java @@ -17,6 +17,8 @@ package org.apache.ignite.metastorage.common; +import org.jetbrains.annotations.Nullable; + /** * Defines operation for meta storage conditional update (invoke). */ @@ -37,28 +39,32 @@ public final class Operation { /** * Represents operation of type <i>remove</i>. */ - public static final class RemoveOp implements InnerOp { + public static final class RemoveOp extends AbstractOp { /** * Default no-op constructor. + * + * @param key Identifies an entry which operation will be applied to. */ - RemoveOp() { - // No-op. + RemoveOp(byte[] key) { + super(key); } } /** * Represents operation of type <i>put</i>. */ - public static final class PutOp implements InnerOp { + public static final class PutOp extends AbstractOp { /** Value. */ private final byte[] val; /** * Constructs operation of type <i>put</i>. * + * @param key Identifies an entry which operation will be applied to. * @param val The value to which the entry should be updated. */ - PutOp(byte[] val) { + PutOp(byte[] key, byte[] val) { + super(key); this.val = val; } } @@ -66,12 +72,12 @@ public final class Operation { /** * Represents operation of type <i>no-op</i>. */ - public static final class NoOp implements InnerOp { + public static final class NoOp extends AbstractOp { /** * Default no-op constructor. */ NoOp() { - // No-op. + super(null); } } @@ -79,6 +85,19 @@ public final class Operation { * Defines operation interface. */ private interface InnerOp { - // Marker interface. + @Nullable byte[] key(); + } + + private static class AbstractOp implements InnerOp { + @Nullable private final byte[] key; + + public AbstractOp(@Nullable byte[] key) { + this.key = key; + } + + @Nullable + @Override public byte[] key() { + return key; + } } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java index 994f4bd..e51fa9e 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java @@ -30,20 +30,22 @@ public final class Operations { /** * Creates operation of type <i>remove</i>. This type of operation removes entry. * + * @param key Identifies an entry which operation will be applied to. * @return Operation of type <i>remove</i>. */ - public static Operation remove() { - return new Operation(new Operation.RemoveOp()); + public static Operation remove(Key key) { + return new Operation(new Operation.RemoveOp(key.bytes())); } /** * Creates operation of type <i>put</i>. This type of operation inserts or updates value of entry. * + * @param key Identifies an entry which operation will be applied to. * @param value Value. * @return Operation of type <i>put</i>. */ - public static Operation put(byte[] value) { - return new Operation(new Operation.PutOp(value)); + public static Operation put(Key key, byte[] value) { + return new Operation(new Operation.PutOp(key.bytes(), value)); } /** diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml index d73d080..5f0f453 100644 --- a/modules/metastorage-server/pom.xml +++ b/modules/metastorage-server/pom.xml @@ -29,20 +29,18 @@ <relativePath>../../parent/pom.xml</relativePath> </parent> - <artifactId>metastorage-server</artifactId> + <artifactId>ignite-metastorage-server</artifactId> <version>3.0.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> - <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>metastorage-common</artifactId> - <version>${project.version}</version> + <artifactId>ignite-metastorage-common</artifactId> </dependency> <dependency> diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java new file mode 100644 index 0000000..55ec9e6 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java @@ -0,0 +1,13 @@ +package org.apache.ignite.internal.metastorage.server; + +public abstract class AbstractCondition implements Condition { + private final byte[] key; + + public AbstractCondition(byte[] key) { + this.key = key; + } + + @Override public byte[] key() { + return key; + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java new file mode 100644 index 0000000..ea4fcc7 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java @@ -0,0 +1,7 @@ +package org.apache.ignite.internal.metastorage.server; + +public interface Condition { + byte[] key(); + + boolean test(Entry e); +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 5d6da44..5659d3e 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -59,6 +59,8 @@ public interface KeyValueStorage { @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys); + boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure); + Cursor<Entry> range(byte[] keyFrom, byte[] keyTo); Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound); diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java new file mode 100644 index 0000000..2d1fbfd --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java @@ -0,0 +1,31 @@ +package org.apache.ignite.internal.metastorage.server; + +public class Operation { + private final byte[] key; + private final byte[] val; + private final Type type; + + public Operation(Type type, byte[] key, byte[] val) { + this.key = key; + this.val = val; + this.type = type; + } + + byte[] key() { + return key; + } + + byte[] value() { + return val; + } + + Type type() { + return type; + } + + enum Type { + PUT, + REMOVE, + NO_OP + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java new file mode 100644 index 0000000..95a0137 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java @@ -0,0 +1,75 @@ +package org.apache.ignite.internal.metastorage.server; + +public class RevisionCondition extends AbstractCondition { + private final Type type; + + private final long rev; + + public RevisionCondition(Type type, byte[] key, long rev) { + super(key); + this.type = type; + this.rev = rev; + } + + /** {@inheritDoc} */ + @Override public boolean test(Entry e) { + int res = Long.compare(e.revision(), rev); + + return type.test(res); + } + + /** + * Defines possible condition types which can be applied to the revision. + */ + public enum Type { + /** Equality condition type. */ + EQUAL { + @Override public boolean test(long res) { + return res == 0; + } + }, + + /** Inequality condition type. */ + NOT_EQUAL { + @Override public boolean test(long res) { + return res != 0; + } + }, + + /** Greater than condition type. */ + GREATER { + @Override public boolean test(long res) { + return res > 0; + } + }, + + /** Less than condition type. */ + LESS { + @Override public boolean test(long res) { + return res < 0; + } + }, + + /** Less than or equal to condition type. */ + LESS_OR_EQUAL { + @Override public boolean test(long res) { + return res <= 0; + } + }, + + /** Greater than or equal to condition type. */ + GREATER_OR_EQUAL { + @Override public boolean test(long res) { + return res >= 0; + } + }; + + /** + * Interprets comparison result. + * + * @param res The result of comparison. + * @return The interpretation of the comparison result. + */ + public abstract boolean test(long res); + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index b37c96a..3033f2c 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -29,6 +29,7 @@ import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Predicate; + import org.apache.ignite.metastorage.common.Cursor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.TestOnly; @@ -63,14 +64,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void put(byte[] key, byte[] value) { synchronized (mux) { - doPut(key, value); + long curRev = rev + 1; + + doPut(key, value, curRev); + + rev = curRev; } } @NotNull @Override public Entry getAndPut(byte[] key, byte[] bytes) { synchronized (mux) { - long lastRev = doPut(key, bytes); + long curRev = rev + 1; + + long lastRev = doPut(key, bytes, curRev); + + rev = curRev; // Return previous value. return doGetValue(key, lastRev); @@ -129,15 +138,24 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void remove(byte[] key) { synchronized (mux) { - Entry e = doGet(key, LATEST_REV, false); - - if (e.empty() || e.tombstone()) - return; + long curRev = rev + 1; - doPut(key, TOMBSTONE); + if (doRemove(key, curRev)) + rev = curRev; } } + private boolean doRemove(byte[] key, long curRev) { + Entry e = doGet(key, LATEST_REV, false); + + if (e.empty() || e.tombstone()) + return false; + + doPut(key, TOMBSTONE, curRev); + + return true; + } + @NotNull @Override public Entry getAndRemove(byte[] key) { synchronized (mux) { @@ -204,6 +222,47 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return res; } + @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) { + synchronized (mux) { + Entry e = get(condition.key()); + + boolean branch = condition.test(e); + + Collection<Operation> ops = branch ? success : failure; + + long curRev = rev + 1; + + boolean modified = false; + + for (Operation op : ops) { + switch (op.type()) { + case PUT: + doPut(op.key(), op.value(), curRev); + + modified = true; + + break; + + case REMOVE: + modified |= doRemove(op.key(), curRev); + + break; + + case NO_OP: + break; + + default: + throw new IllegalArgumentException("Unknown operation type: " + op.type()); + } + } + + if (modified) + rev = curRev; + + return branch; + } + } + @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) { return new RangeCursor(keyFrom, keyTo, rev); } @@ -365,9 +424,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter()); } - private long doPut(byte[] key, byte[] bytes) { - long curRev = ++rev; - + private long doPut(byte[] key, byte[] bytes, long curRev) { long curUpdCntr = ++updCntr; // Update keysIdx. @@ -378,13 +435,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { revs.add(curRev); // Update revsIdx. - NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); + //NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); Value val = new Value(bytes, curUpdCntr); - entries.put(key, val); + //entries.put(key, val); - revsIdx.put(curRev, entries); + //revsIdx.put(curRev, entries); + + revsIdx.compute( + curRev, + (rev, entries) -> { + if (entries == null) + entries = new TreeMap<>(CMP); + + entries.put(key, val); + + return entries; + } + ); return lastRev; } diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java new file mode 100644 index 0000000..dfd9f8e --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java @@ -0,0 +1,49 @@ +package org.apache.ignite.internal.metastorage.server; + +import java.util.Arrays; + +public class ValueCondition extends AbstractCondition { + private final Type type; + + private final byte[] val; + + public ValueCondition(Type type, byte[] key, byte[] val) { + super(key); + this.type = type; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean test(Entry e) { + int res = Arrays.compare(e.value(), val); + + return type.test(res); + } + + /** + * Defines possible condition types which can be applied to the value. + */ + enum Type { + /** Equality condition type. */ + EQUAL { + @Override public boolean test(long res) { + return res == 0; + } + }, + + /** Inequality condition type. */ + NOT_EQUAL { + @Override public boolean test(long res) { + return res != 0; + } + }; + + /** + * Interprets comparison result. + * + * @param res The result of comparison. + * @return The interpretation of the comparison result. + */ + public abstract boolean test(long res); + } +} diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java new file mode 100644 index 0000000..3a08c13 --- /dev/null +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java @@ -0,0 +1,65 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RevisionConditionTest { + private static final byte[] key = new byte[] {1}; + + private static final byte[] val = new byte[] {2}; + + @Test + public void eq() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.EQUAL, key, 1); + + // 1 == 1. + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void ne() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, 1); + + // 2 != 1. + assertTrue(cond.test(new Entry(key, val, 2, 1))); + } + + @Test + public void gt() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER, key, 1); + + // 2 > 1. + assertTrue(cond.test(new Entry(key, val, 2, 1))); + } + + @Test + public void ge() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, 1); + + // 2 >= 1 (2 > 1). + assertTrue(cond.test(new Entry(key, val, 2, 1))); + + // 1 >= 1 (1 == 1). + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void lt() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS, key, 2); + + // 1 < 2 + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void le() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, 2); + + // 1 <= 2 (1 < 2) + assertTrue(cond.test(new Entry(key, val, 1, 1))); + + // 1 <= 1 (1 == 1). + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } +} diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java index 27df790..2cda29b 100644 --- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java @@ -939,6 +939,329 @@ class SimpleInMemoryKeyValueStorageTest { } @Test + public void invokeWithRevisionCondition_successBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ), + List.of(new Operation(Operation.Type.PUT, key3, val3)) + ); + + // "Success" branch is applied. + assertTrue(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Failure" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithRevisionCondition_failureBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2), + List.of(new Operation(Operation.Type.PUT, key3, val3)), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ) + ); + + // "Failure" branch is applied. + assertFalse(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Success" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithValueCondition_successBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ), + List.of(new Operation(Operation.Type.PUT, key3, val3)) + ); + + // "Success" branch is applied. + assertTrue(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Failure" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithValueCondition_failureBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2), + List.of(new Operation(Operation.Type.PUT, key3, val3)), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ) + ); + + // "Failure" branch is applied. + assertFalse(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Success" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeOperations() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + // No-op. + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of(new Operation(Operation.Type.NO_OP, null, null)), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // No updates. + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + // Put. + branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of( + new Operation(Operation.Type.PUT, key2, val2), + new Operation(Operation.Type.PUT, key3, val3) + ), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // +1 for revision, +2 for update counter. + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(2, e2.updateCounter()); + assertArrayEquals(key2, e2.key()); + assertArrayEquals(val2, e2.value()); + + Entry e3 = storage.get(key3); + + assertFalse(e3.empty()); + assertFalse(e3.tombstone()); + assertEquals(2, e3.revision()); + assertEquals(3, e3.updateCounter()); + assertArrayEquals(key3, e3.key()); + assertArrayEquals(val3, e3.value()); + + // Remove. + branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of( + new Operation(Operation.Type.REMOVE, key2, null), + new Operation(Operation.Type.REMOVE, key3, null) + ), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // +1 for revision, +2 for update counter. + assertEquals(3, storage.revision()); + assertEquals(5, storage.updateCounter()); + + e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertTrue(e2.tombstone()); + assertEquals(3, e2.revision()); + assertEquals(4, e2.updateCounter()); + assertArrayEquals(key2, e2.key()); + + e3 = storage.get(key3); + + assertFalse(e3.empty()); + assertTrue(e3.tombstone()); + assertEquals(3, e3.revision()); + assertEquals(5, e3.updateCounter()); + assertArrayEquals(key3, e3.key()); + } + + @Test public void compact() { assertEquals(0, storage.revision()); assertEquals(0, storage.updateCounter()); diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java new file mode 100644 index 0000000..717da54 --- /dev/null +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java @@ -0,0 +1,27 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ValueConditionTest { + private static final byte[] key = new byte[] {1}; + + private static final byte[] val1 = new byte[] {11}; + + private static final byte[] val2 = new byte[] {22}; + + @Test + public void eq() { + ValueCondition cond = new ValueCondition(ValueCondition.Type.EQUAL, key, val1); + + assertTrue(cond.test(new Entry(key, val1, 1, 1))); + } + + @Test + public void ne() { + ValueCondition cond = new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, val1); + + assertTrue(cond.test(new Entry(key, val2, 1, 1))); + } +} diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 4e06338..3a62146 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -303,27 +303,25 @@ import org.jetbrains.annotations.Nullable; } /** - * @see MetaStorageService#invoke(Key, Condition, Operation, Operation) + * @see MetaStorageService#invoke(Condition, Operation, Operation) */ public @NotNull CompletableFuture<Boolean> invoke( - @NotNull Key key, @NotNull Condition cond, @NotNull Operation success, @NotNull Operation failure ) { - return metaStorageSvc.invoke(key, cond, success, failure); + return metaStorageSvc.invoke(cond, success, failure); } /** - * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation) + * @see MetaStorageService#getAndInvoke(Condition, Operation, Operation) */ public @NotNull CompletableFuture<Entry> getAndInvoke( - @NotNull Key key, @NotNull Condition cond, @NotNull Operation success, @NotNull Operation failure ) { - return metaStorageSvc.getAndInvoke(key, cond, success, failure); + return metaStorageSvc.getAndInvoke(cond, success, failure); } /** diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java index 6c6b0f2..6dc1c9b 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java @@ -213,70 +213,79 @@ public class IgnitionImpl implements Ignition { private static MetaStorageService metaStorageServiceMock() { return new MetaStorageService() { @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } - @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Key key, @NotNull Condition condition, + @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } - @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition, - @NotNull Operation success, @NotNull Operation failure) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, + @NotNull Collection<Operation> success, + @NotNull Collection<Operation> failure + ) { + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); + } + + @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition, + @NotNull Operation success, + @NotNull Operation failure + ) { + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo, @@ -295,11 +304,11 @@ public class IgnitionImpl implements Ignition { } @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> compact() { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } }; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 86a04e4..593ec5b 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -221,15 +221,15 @@ public class TableManager implements IgniteTables { UUID tblId = new UUID(revision, update); + Key key = new Key(INTERNAL_PREFIX + tblId); CompletableFuture<Boolean> fut = metaStorageMgr.invoke( - new Key(INTERNAL_PREFIX + tblId.toString()), - Conditions.value().eq(null), - Operations.put(tableView.name().getBytes(StandardCharsets.UTF_8)), + Conditions.value(key).eq(null), + Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)), Operations.noop()); try { if (fut.get()) { - metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]); + metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId), new byte[0]); LOG.info("Table manager created a table [name={}, revision={}]", tableView.name(), revision);
