This is an automated email from the ASF dual-hosted git repository. agura pushed a commit to branch ignite-14389 in repository https://gitbox.apache.org/repos/asf/ignite-3.git
commit 88334926cd74e5aeaa825e9603df78909dd07e7e Author: Andrey Gura <[email protected]> AuthorDate: Tue May 4 18:37:10 2021 +0300 IGNITE-14389 Implemented conditional update (invoke) --- .../ignite/internal/affinity/AffinityManager.java | 17 +- .../metastorage/client/MetaStorageService.java | 32 +- .../ignite/metastorage/common/Condition.java | 147 +++------- .../ignite/metastorage/common/Conditions.java | 33 +-- .../ignite/metastorage/common/Operation.java | 45 +-- .../ignite/metastorage/common/Operations.java | 6 +- modules/metastorage-server/pom.xml | 6 +- .../metastorage/server/AbstractCondition.java | 13 + .../internal/metastorage/server/Condition.java | 7 + .../metastorage/server/KeyValueStorage.java | 2 + .../internal/metastorage/server/Operation.java | 31 ++ .../metastorage/server/RevisionCondition.java | 75 +++++ .../server/SimpleInMemoryKeyValueStorage.java | 95 +++++- .../metastorage/server/ValueCondition.java | 49 ++++ .../metastorage/server/RevisionConditionTest.java | 65 +++++ .../server/SimpleInMemoryKeyValueStorageTest.java | 323 +++++++++++++++++++++ .../metastorage/server/ValueConditionTest.java | 27 ++ .../internal/metastorage/MetaStorageManager.java | 23 +- .../apache/ignite/internal/app/IgnitionImpl.java | 56 ++-- .../internal/table/distributed/TableManager.java | 19 +- 20 files changed, 837 insertions(+), 234 deletions(-) diff --git a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java index 37690bf..e658ca1 100644 --- a/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java +++ b/modules/affinity/src/main/java/org/apache/ignite/internal/affinity/AffinityManager.java @@ -37,6 +37,7 @@ import org.apache.ignite.metastorage.common.Operations; import org.apache.ignite.metastorage.common.WatchEvent; import org.apache.ignite.metastorage.common.WatchListener; import org.jetbrains.annotations.NotNull; +import org.jetbrains.annotations.Nullable; /** * Affinity manager is responsible for affinity function related logic including calculating affinity assignments. @@ -143,16 +144,20 @@ public class AffinityManager { UUID tblId = UUID.fromString(placeholderValue); try { - String name = new String(vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId.toString())).get().value(), StandardCharsets.UTF_8); + byte[] tblNameVal = vaultManager.get(ByteArray.fromString(INTERNAL_PREFIX + tblId)).get().value(); - int partitions = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) - .tables().get(name).partitions().value(); - int replicas = configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) - .tables().get(name).replicas().value(); + String name = new String(tblNameVal, StandardCharsets.UTF_8); + + int partitions = configurationMgr.configurationRegistry() + .getConfiguration(TablesConfiguration.KEY).tables().get(name).partitions().value(); + + int replicas = configurationMgr.configurationRegistry() + .getConfiguration(TablesConfiguration.KEY).tables().get(name).replicas().value(); var key = evt.newEntry().key(); + metaStorageMgr.invoke( - Conditions.key(key).value().eq(assignmentVal), + Conditions.value(key).eq(assignmentVal), Operations.put(key, ByteUtils.toBytes( RendezvousAffinityFunction.assignPartitions( baselineMgr.nodes(), diff --git a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java index 6c24a59..409f90d 100644 --- a/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java +++ b/modules/metastorage-client/src/main/java/org/apache/ignite/metastorage/client/MetaStorageService.java @@ -203,9 +203,9 @@ public interface MetaStorageService { * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> * * @param condition The condition. - * @param success Batch of updates which will be atomically applied in case of condition evaluation yields {@code true}. - * @param failure Batch of updates which will be atomically applied in case of condition evaluation yields {@code false}. - * @return Future result {@code true} if {@code success} updates were applied, otherwise {@code false}. + * @param success The update which will be applied in case of condition evaluation yields {@code true}. + * @param failure The update which will be applied in case of condition evaluation yields {@code false}. + * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. * @see Key * @see Entry @@ -215,14 +215,34 @@ public interface MetaStorageService { // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, - @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure); + @NotNull Operation success, @NotNull Operation failure); + + /** + * Updates an entry for the given key conditionally. + * + * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> + * + * @param condition The condition. + * @param success The updates which will be applied in case of condition evaluation yields {@code true}. + * @param failure The updates which will be applied in case of condition evaluation yields {@code false}. + * @return Future result {@code true} if {@code success} update was applied, otherwise {@code false}. + * @throws OperationTimeoutException If the operation is timed out. Will be thrown on getting future result. + * @see Key + * @see Entry + * @see Condition + * @see Operation + */ + // TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. + @NotNull + CompletableFuture<Boolean> invoke(@NotNull Condition condition, + @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure); + /** * Updates an entry for the given key conditionally. * * <p>Conditional update could be treated as <i>if(condition)-then(success)-else(failure)</i> expression.</p> * - * @param key The key. Couldn't be {@code null}. * @param condition The condition. * @param success The update which will be applied in case of condition evaluation yields {@code true}. * @param failure The update which will be applied in case of condition evaluation yields {@code false}. @@ -235,7 +255,7 @@ public interface MetaStorageService { */ //TODO: https://issues.apache.org/jira/browse/IGNITE-14269: will be replaced by conditional multi update. @NotNull - CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition, + CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition, @NotNull Operation success, @NotNull Operation failure); /** diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java index 54049de..1ddc13c 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Condition.java @@ -17,8 +17,6 @@ package org.apache.ignite.metastorage.common; -import java.util.Arrays; - /** * Represents a condition for conditional update. */ @@ -36,22 +34,11 @@ public final class Condition { } /** - * Tests the given entry on satisfaction of the condition. - * - * @param e Entry. - * @return The result of condition test. {@code true} - if the entry satisfies to the condition, - * otherwise - {@code false}. - */ - public boolean test(Entry e) { - return cond.test(e); - } - - /** * Represents condition on entry revision. Only one type of condition could be applied to * the one instance of condition. Subsequent invocations of any method which produces condition will throw * {@link IllegalStateException}. */ - public static final class RevisionCondition implements InnerCondition { + public static final class RevisionCondition extends AbstractCondition { /** * The type of condition. * @@ -62,16 +49,13 @@ public final class Condition { /** The revision as the condition argument. */ private long rev; - /** Key of entry, which will be tested for condition. */ - private final Key key; - /** - * Creates a new condition for the given {@code key}. + * Constructs a condition by a revision for an entry identified by the given key. * - * @param key Key of entry, to be tested for the condition. + * @param key Identifies an entry which condition will be applied to. */ - RevisionCondition(Key key) { - this.key = key; + RevisionCondition(byte[] key) { + super(key); } /** @@ -176,70 +160,27 @@ public final class Condition { return new Condition(this); } - /** {@inheritDoc} */ - @Override public boolean test(Entry e) { - if ((e.key() == key) || (e.key() != null && e.key().equals(key))) { - int res = Long.compare(e.revision(), rev); - - return type.test(res); - } - else - return false; - } - /** * Defines possible condition types which can be applied to the revision. */ enum Type { /** Equality condition type. */ - EQUAL { - @Override public boolean test(long res) { - return res == 0; - } - }, + EQUAL, /** Inequality condition type. */ - NOT_EQUAL { - @Override public boolean test(long res) { - return res != 0; - } - }, + NOT_EQUAL, /** Greater than condition type. */ - GREATER { - @Override public boolean test(long res) { - return res > 0; - } - }, + GREATER, /** Less than condition type. */ - LESS { - @Override public boolean test(long res) { - return res < 0; - } - }, + LESS, /** Less than or equal to condition type. */ - LESS_OR_EQUAL { - @Override public boolean test(long res) { - return res <= 0; - } - }, + LESS_OR_EQUAL, /** Greater than or equal to condition type. */ - GREATER_OR_EQUAL { - @Override public boolean test(long res) { - return res >= 0; - } - }; - - /** - * Interprets comparison result. - * - * @param res The result of comparison. - * @return The interpretation of the comparison result. - */ - public abstract boolean test(long res); + GREATER_OR_EQUAL } } @@ -248,7 +189,7 @@ public final class Condition { * the one instance of condition. Subsequent invocations of any method which produces condition will throw * {@link IllegalStateException}. */ - public static final class ValueCondition implements InnerCondition { + public static final class ValueCondition extends AbstractCondition { /** * The type of condition. * @@ -259,16 +200,13 @@ public final class Condition { /** The value as the condition argument. */ private byte[] val; - /** Key of entry, which will be tested for condition. */ - private final Key key; - /** - * Creates a new condition for the given {@code key}. + * Constructs a condition by a value for an entry identified by the given key. * - * @param key Key of entry, to be tested for the condition. + * @param key Identifies an entry which condition will be applied to. */ - ValueCondition(Key key) { - this.key = key; + ValueCondition(byte[] key) { + super(key); } /** @@ -305,42 +243,15 @@ public final class Condition { return new Condition(this); } - /** {@inheritDoc} */ - @Override public boolean test(Entry e) { - if ((e.key() == key) || (e.key() != null && e.key().equals(key))) { - int res = Arrays.compare(e.value(), val); - - return type.test(res); - } - else - return false; - } - /** * Defines possible condition types which can be applied to the value. */ enum Type { /** Equality condition type. */ - EQUAL { - @Override public boolean test(long res) { - return res == 0; - } - }, + EQUAL, /** Inequality condition type. */ - NOT_EQUAL { - @Override public boolean test(long res) { - return res != 0; - } - }; - - /** - * Interprets comparison result. - * - * @param res The result of comparison. - * @return The interpretation of the comparison result. - */ - public abstract boolean test(long res); + NOT_EQUAL } } @@ -357,14 +268,24 @@ public final class Condition { /** * Defines condition interface. */ - private interface InnerCondition { + public interface InnerCondition { /** - * Tests the given entry on satisfaction of the condition. + * Returns key which identifies an entry which condition will be applied to. * - * @param e Entry. - * @return The result of condition test. {@code true} - if the entry satisfies to the condition, - * otherwise - {@code false}. + * @return Key which identifies an entry which condition will be applied to. */ - boolean test(Entry e); + byte[] key(); + } + + private static abstract class AbstractCondition implements InnerCondition { + private final byte[] key; + + public AbstractCondition(byte[] key) { + this.key = key; + } + + @Override public byte[] key() { + return key; + } } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java index 87fa238..567b20b 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Conditions.java @@ -24,47 +24,26 @@ package org.apache.ignite.metastorage.common; * @see Condition */ public final class Conditions { - - /** Key. */ - private Key key; - - /** - * Creates new condition for entry with concrete key. - * - * @param key Key - */ - private Conditions(Key key) { - this.key = key; - } - /** * Creates condition on entry revision. * + * @param key Identifies an entry which condition will be applied to. * @return Condition on entry revision. * @see Condition.RevisionCondition */ - public Condition.RevisionCondition revision() { - return new Condition.RevisionCondition(key); + public static Condition.RevisionCondition revision(Key key) { + return new Condition.RevisionCondition(key.bytes()); } /** * Creates condition on entry value. * + * @param key Identifies an entry which condition will be applied to. * @return Condition on entry value. * @see Condition.ValueCondition */ - public Condition.ValueCondition value() { - return new Condition.ValueCondition(key); - } - - /** - * Creates key-based condition. - * - * @param key Key of condition. - * @return Key-based condition instance. - */ - public static Conditions key(Key key) { - return new Conditions(key); + public static Condition.ValueCondition value(Key key) { + return new Condition.ValueCondition(key.bytes()); } /** diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java index 9810fe5..fd5428a 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operation.java @@ -17,6 +17,8 @@ package org.apache.ignite.metastorage.common; +import org.jetbrains.annotations.Nullable; + /** * Defines operation for meta storage conditional update (invoke). */ @@ -37,37 +39,33 @@ public final class Operation { /** * Represents operation of type <i>remove</i>. */ - public static final class RemoveOp implements InnerOp { - /** Key. */ - private final Key key; - + public static final class RemoveOp extends AbstractOp { /** - * Creates a new remove operation for the given {@code key}. + * Default no-op constructor. * - * @param key Key. + * @param key Identifies an entry which operation will be applied to. */ - RemoveOp(Key key) { - this.key = key; + RemoveOp(byte[] key) { + super(key); } } /** * Represents operation of type <i>put</i>. */ - public static final class PutOp implements InnerOp { - /** Key. */ - private final Key key; - + public static final class PutOp extends AbstractOp { /** Value. */ private final byte[] val; /** * Constructs operation of type <i>put</i>. * + * @param key Identifies an entry which operation will be applied to. * @param val The value to which the entry should be updated. */ - PutOp(Key key, byte[] val) { - this.key = key; + PutOp(byte[] key, byte[] val) { + super(key); + this.val = val; } } @@ -75,12 +73,12 @@ public final class Operation { /** * Represents operation of type <i>no-op</i>. */ - public static final class NoOp implements InnerOp { + public static final class NoOp extends AbstractOp { /** * Default no-op constructor. */ NoOp() { - // No-op. + super(null); } } @@ -88,6 +86,19 @@ public final class Operation { * Defines operation interface. */ private interface InnerOp { - // Marker interface. + @Nullable byte[] key(); + } + + private static class AbstractOp implements InnerOp { + @Nullable private final byte[] key; + + public AbstractOp(@Nullable byte[] key) { + this.key = key; + } + + @Nullable + @Override public byte[] key() { + return key; + } } } diff --git a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java index 31c7449..e51fa9e 100644 --- a/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java +++ b/modules/metastorage-common/src/main/java/org/apache/ignite/metastorage/common/Operations.java @@ -30,20 +30,22 @@ public final class Operations { /** * Creates operation of type <i>remove</i>. This type of operation removes entry. * + * @param key Identifies an entry which operation will be applied to. * @return Operation of type <i>remove</i>. */ public static Operation remove(Key key) { - return new Operation(new Operation.RemoveOp(key)); + return new Operation(new Operation.RemoveOp(key.bytes())); } /** * Creates operation of type <i>put</i>. This type of operation inserts or updates value of entry. * + * @param key Identifies an entry which operation will be applied to. * @param value Value. * @return Operation of type <i>put</i>. */ public static Operation put(Key key, byte[] value) { - return new Operation(new Operation.PutOp(key, value)); + return new Operation(new Operation.PutOp(key.bytes(), value)); } /** diff --git a/modules/metastorage-server/pom.xml b/modules/metastorage-server/pom.xml index d73d080..5f0f453 100644 --- a/modules/metastorage-server/pom.xml +++ b/modules/metastorage-server/pom.xml @@ -29,20 +29,18 @@ <relativePath>../../parent/pom.xml</relativePath> </parent> - <artifactId>metastorage-server</artifactId> + <artifactId>ignite-metastorage-server</artifactId> <version>3.0.0-SNAPSHOT</version> <dependencies> <dependency> <groupId>org.apache.ignite</groupId> <artifactId>ignite-core</artifactId> - <version>${project.version}</version> </dependency> <dependency> <groupId>org.apache.ignite</groupId> - <artifactId>metastorage-common</artifactId> - <version>${project.version}</version> + <artifactId>ignite-metastorage-common</artifactId> </dependency> <dependency> diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java new file mode 100644 index 0000000..55ec9e6 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/AbstractCondition.java @@ -0,0 +1,13 @@ +package org.apache.ignite.internal.metastorage.server; + +public abstract class AbstractCondition implements Condition { + private final byte[] key; + + public AbstractCondition(byte[] key) { + this.key = key; + } + + @Override public byte[] key() { + return key; + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java new file mode 100644 index 0000000..ea4fcc7 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Condition.java @@ -0,0 +1,7 @@ +package org.apache.ignite.internal.metastorage.server; + +public interface Condition { + byte[] key(); + + boolean test(Entry e); +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java index 5d6da44..5659d3e 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/KeyValueStorage.java @@ -59,6 +59,8 @@ public interface KeyValueStorage { @NotNull Collection<Entry> getAndRemoveAll(List<byte[]> keys); + boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure); + Cursor<Entry> range(byte[] keyFrom, byte[] keyTo); Cursor<Entry> range(byte[] keyFrom, byte[] keyTo, long revUpperBound); diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java new file mode 100644 index 0000000..2d1fbfd --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/Operation.java @@ -0,0 +1,31 @@ +package org.apache.ignite.internal.metastorage.server; + +public class Operation { + private final byte[] key; + private final byte[] val; + private final Type type; + + public Operation(Type type, byte[] key, byte[] val) { + this.key = key; + this.val = val; + this.type = type; + } + + byte[] key() { + return key; + } + + byte[] value() { + return val; + } + + Type type() { + return type; + } + + enum Type { + PUT, + REMOVE, + NO_OP + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java new file mode 100644 index 0000000..95a0137 --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/RevisionCondition.java @@ -0,0 +1,75 @@ +package org.apache.ignite.internal.metastorage.server; + +public class RevisionCondition extends AbstractCondition { + private final Type type; + + private final long rev; + + public RevisionCondition(Type type, byte[] key, long rev) { + super(key); + this.type = type; + this.rev = rev; + } + + /** {@inheritDoc} */ + @Override public boolean test(Entry e) { + int res = Long.compare(e.revision(), rev); + + return type.test(res); + } + + /** + * Defines possible condition types which can be applied to the revision. + */ + public enum Type { + /** Equality condition type. */ + EQUAL { + @Override public boolean test(long res) { + return res == 0; + } + }, + + /** Inequality condition type. */ + NOT_EQUAL { + @Override public boolean test(long res) { + return res != 0; + } + }, + + /** Greater than condition type. */ + GREATER { + @Override public boolean test(long res) { + return res > 0; + } + }, + + /** Less than condition type. */ + LESS { + @Override public boolean test(long res) { + return res < 0; + } + }, + + /** Less than or equal to condition type. */ + LESS_OR_EQUAL { + @Override public boolean test(long res) { + return res <= 0; + } + }, + + /** Greater than or equal to condition type. */ + GREATER_OR_EQUAL { + @Override public boolean test(long res) { + return res >= 0; + } + }; + + /** + * Interprets comparison result. + * + * @param res The result of comparison. + * @return The interpretation of the comparison result. + */ + public abstract boolean test(long res); + } +} diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java index b37c96a..3033f2c 100644 --- a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorage.java @@ -29,6 +29,7 @@ import java.util.NoSuchElementException; import java.util.TreeMap; import java.util.TreeSet; import java.util.function.Predicate; + import org.apache.ignite.metastorage.common.Cursor; import org.jetbrains.annotations.NotNull; import org.jetbrains.annotations.TestOnly; @@ -63,14 +64,22 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void put(byte[] key, byte[] value) { synchronized (mux) { - doPut(key, value); + long curRev = rev + 1; + + doPut(key, value, curRev); + + rev = curRev; } } @NotNull @Override public Entry getAndPut(byte[] key, byte[] bytes) { synchronized (mux) { - long lastRev = doPut(key, bytes); + long curRev = rev + 1; + + long lastRev = doPut(key, bytes, curRev); + + rev = curRev; // Return previous value. return doGetValue(key, lastRev); @@ -129,15 +138,24 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { @Override public void remove(byte[] key) { synchronized (mux) { - Entry e = doGet(key, LATEST_REV, false); - - if (e.empty() || e.tombstone()) - return; + long curRev = rev + 1; - doPut(key, TOMBSTONE); + if (doRemove(key, curRev)) + rev = curRev; } } + private boolean doRemove(byte[] key, long curRev) { + Entry e = doGet(key, LATEST_REV, false); + + if (e.empty() || e.tombstone()) + return false; + + doPut(key, TOMBSTONE, curRev); + + return true; + } + @NotNull @Override public Entry getAndRemove(byte[] key) { synchronized (mux) { @@ -204,6 +222,47 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return res; } + @Override public boolean invoke(Condition condition, Collection<Operation> success, Collection<Operation> failure) { + synchronized (mux) { + Entry e = get(condition.key()); + + boolean branch = condition.test(e); + + Collection<Operation> ops = branch ? success : failure; + + long curRev = rev + 1; + + boolean modified = false; + + for (Operation op : ops) { + switch (op.type()) { + case PUT: + doPut(op.key(), op.value(), curRev); + + modified = true; + + break; + + case REMOVE: + modified |= doRemove(op.key(), curRev); + + break; + + case NO_OP: + break; + + default: + throw new IllegalArgumentException("Unknown operation type: " + op.type()); + } + } + + if (modified) + rev = curRev; + + return branch; + } + } + @Override public Cursor<Entry> range(byte[] keyFrom, byte[] keyTo) { return new RangeCursor(keyFrom, keyTo, rev); } @@ -365,9 +424,7 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { return new Entry(key, lastVal.bytes() , lastRev, lastVal.updateCounter()); } - private long doPut(byte[] key, byte[] bytes) { - long curRev = ++rev; - + private long doPut(byte[] key, byte[] bytes, long curRev) { long curUpdCntr = ++updCntr; // Update keysIdx. @@ -378,13 +435,25 @@ public class SimpleInMemoryKeyValueStorage implements KeyValueStorage { revs.add(curRev); // Update revsIdx. - NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); + //NavigableMap<byte[], Value> entries = new TreeMap<>(CMP); Value val = new Value(bytes, curUpdCntr); - entries.put(key, val); + //entries.put(key, val); - revsIdx.put(curRev, entries); + //revsIdx.put(curRev, entries); + + revsIdx.compute( + curRev, + (rev, entries) -> { + if (entries == null) + entries = new TreeMap<>(CMP); + + entries.put(key, val); + + return entries; + } + ); return lastRev; } diff --git a/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java new file mode 100644 index 0000000..dfd9f8e --- /dev/null +++ b/modules/metastorage-server/src/main/java/org/apache/ignite/internal/metastorage/server/ValueCondition.java @@ -0,0 +1,49 @@ +package org.apache.ignite.internal.metastorage.server; + +import java.util.Arrays; + +public class ValueCondition extends AbstractCondition { + private final Type type; + + private final byte[] val; + + public ValueCondition(Type type, byte[] key, byte[] val) { + super(key); + this.type = type; + this.val = val; + } + + /** {@inheritDoc} */ + @Override public boolean test(Entry e) { + int res = Arrays.compare(e.value(), val); + + return type.test(res); + } + + /** + * Defines possible condition types which can be applied to the value. + */ + enum Type { + /** Equality condition type. */ + EQUAL { + @Override public boolean test(long res) { + return res == 0; + } + }, + + /** Inequality condition type. */ + NOT_EQUAL { + @Override public boolean test(long res) { + return res != 0; + } + }; + + /** + * Interprets comparison result. + * + * @param res The result of comparison. + * @return The interpretation of the comparison result. + */ + public abstract boolean test(long res); + } +} diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java new file mode 100644 index 0000000..3a08c13 --- /dev/null +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/RevisionConditionTest.java @@ -0,0 +1,65 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class RevisionConditionTest { + private static final byte[] key = new byte[] {1}; + + private static final byte[] val = new byte[] {2}; + + @Test + public void eq() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.EQUAL, key, 1); + + // 1 == 1. + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void ne() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.NOT_EQUAL, key, 1); + + // 2 != 1. + assertTrue(cond.test(new Entry(key, val, 2, 1))); + } + + @Test + public void gt() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER, key, 1); + + // 2 > 1. + assertTrue(cond.test(new Entry(key, val, 2, 1))); + } + + @Test + public void ge() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.GREATER_OR_EQUAL, key, 1); + + // 2 >= 1 (2 > 1). + assertTrue(cond.test(new Entry(key, val, 2, 1))); + + // 1 >= 1 (1 == 1). + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void lt() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS, key, 2); + + // 1 < 2 + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } + + @Test + public void le() { + RevisionCondition cond = new RevisionCondition(RevisionCondition.Type.LESS_OR_EQUAL, key, 2); + + // 1 <= 2 (1 < 2) + assertTrue(cond.test(new Entry(key, val, 1, 1))); + + // 1 <= 1 (1 == 1). + assertTrue(cond.test(new Entry(key, val, 1, 1))); + } +} diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java index 27df790..2cda29b 100644 --- a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/SimpleInMemoryKeyValueStorageTest.java @@ -939,6 +939,329 @@ class SimpleInMemoryKeyValueStorageTest { } @Test + public void invokeWithRevisionCondition_successBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 1), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ), + List.of(new Operation(Operation.Type.PUT, key3, val3)) + ); + + // "Success" branch is applied. + assertTrue(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Failure" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithRevisionCondition_failureBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new RevisionCondition(RevisionCondition.Type.EQUAL, key1, 2), + List.of(new Operation(Operation.Type.PUT, key3, val3)), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ) + ); + + // "Failure" branch is applied. + assertFalse(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Success" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithValueCondition_successBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_1), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ), + List.of(new Operation(Operation.Type.PUT, key3, val3)) + ); + + // "Success" branch is applied. + assertTrue(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Failure" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeWithValueCondition_failureBranch() { + byte[] key1 = k(1); + byte[] val1_1 = kv(1, 11); + byte[] val1_2 = kv(1, 12); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1_1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1_2), + List.of(new Operation(Operation.Type.PUT, key3, val3)), + List.of( + new Operation(Operation.Type.PUT, key1, val1_2), + new Operation(Operation.Type.PUT, key2, val2) + ) + ); + + // "Failure" branch is applied. + assertFalse(branch); + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e1 = storage.get(key1); + + assertFalse(e1.empty()); + assertFalse(e1.tombstone()); + assertEquals(2, e1.revision()); + assertEquals(2, e1.updateCounter()); + assertArrayEquals(val1_2, e1.value()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(3, e2.updateCounter()); + assertArrayEquals(val2, e2.value()); + + // "Success" branch isn't applied. + Entry e3 = storage.get(key3); + + assertTrue(e3.empty()); + } + + @Test + public void invokeOperations() { + byte[] key1 = k(1); + byte[] val1 = kv(1, 1); + + byte[] key2 = k(2); + byte[] val2 = kv(2, 2); + + byte[] key3 = k(3); + byte[] val3 = kv(3, 3); + + assertEquals(0, storage.revision()); + assertEquals(0, storage.updateCounter()); + + storage.put(key1, val1); + + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + // No-op. + boolean branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of(new Operation(Operation.Type.NO_OP, null, null)), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // No updates. + assertEquals(1, storage.revision()); + assertEquals(1, storage.updateCounter()); + + // Put. + branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of( + new Operation(Operation.Type.PUT, key2, val2), + new Operation(Operation.Type.PUT, key3, val3) + ), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // +1 for revision, +2 for update counter. + assertEquals(2, storage.revision()); + assertEquals(3, storage.updateCounter()); + + Entry e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertFalse(e2.tombstone()); + assertEquals(2, e2.revision()); + assertEquals(2, e2.updateCounter()); + assertArrayEquals(key2, e2.key()); + assertArrayEquals(val2, e2.value()); + + Entry e3 = storage.get(key3); + + assertFalse(e3.empty()); + assertFalse(e3.tombstone()); + assertEquals(2, e3.revision()); + assertEquals(3, e3.updateCounter()); + assertArrayEquals(key3, e3.key()); + assertArrayEquals(val3, e3.value()); + + // Remove. + branch = storage.invoke( + new ValueCondition(ValueCondition.Type.EQUAL, key1, val1), + List.of( + new Operation(Operation.Type.REMOVE, key2, null), + new Operation(Operation.Type.REMOVE, key3, null) + ), + List.of(new Operation(Operation.Type.NO_OP, null, null)) + ); + + assertTrue(branch); + + // +1 for revision, +2 for update counter. + assertEquals(3, storage.revision()); + assertEquals(5, storage.updateCounter()); + + e2 = storage.get(key2); + + assertFalse(e2.empty()); + assertTrue(e2.tombstone()); + assertEquals(3, e2.revision()); + assertEquals(4, e2.updateCounter()); + assertArrayEquals(key2, e2.key()); + + e3 = storage.get(key3); + + assertFalse(e3.empty()); + assertTrue(e3.tombstone()); + assertEquals(3, e3.revision()); + assertEquals(5, e3.updateCounter()); + assertArrayEquals(key3, e3.key()); + } + + @Test public void compact() { assertEquals(0, storage.revision()); assertEquals(0, storage.updateCounter()); diff --git a/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java new file mode 100644 index 0000000..717da54 --- /dev/null +++ b/modules/metastorage-server/src/test/java/org/apache/ignite/internal/metastorage/server/ValueConditionTest.java @@ -0,0 +1,27 @@ +package org.apache.ignite.internal.metastorage.server; + +import org.junit.jupiter.api.Test; + +import static org.junit.jupiter.api.Assertions.assertTrue; + +public class ValueConditionTest { + private static final byte[] key = new byte[] {1}; + + private static final byte[] val1 = new byte[] {11}; + + private static final byte[] val2 = new byte[] {22}; + + @Test + public void eq() { + ValueCondition cond = new ValueCondition(ValueCondition.Type.EQUAL, key, val1); + + assertTrue(cond.test(new Entry(key, val1, 1, 1))); + } + + @Test + public void ne() { + ValueCondition cond = new ValueCondition(ValueCondition.Type.NOT_EQUAL, key, val1); + + assertTrue(cond.test(new Entry(key, val2, 1, 1))); + } +} diff --git a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java index 669b937..fca2125 100644 --- a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java +++ b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java @@ -18,7 +18,6 @@ package org.apache.ignite.internal.metastorage; import java.util.Collection; -import java.util.Collections; import java.util.Map; import java.util.Optional; import java.util.concurrent.CompletableFuture; @@ -308,37 +307,36 @@ import org.jetbrains.annotations.Nullable; /** * Invoke with single success/failure operation. * - * @see MetaStorageService#invoke(Condition, Collection, Collection) + * @see MetaStorageService#invoke(Condition, Operation, Operation) */ public @NotNull CompletableFuture<Boolean> invoke( @NotNull Condition cond, @NotNull Operation success, @NotNull Operation failure ) { - return metaStorageSvc.invoke(cond, Collections.singletonList(success), Collections.singletonList(failure)); + return metaStorageSvc.invoke(cond, success, failure); } /** * @see MetaStorageService#invoke(Condition, Collection, Collection) */ public @NotNull CompletableFuture<Boolean> invoke( - @NotNull Condition cond, - @NotNull Collection<Operation> success, - @NotNull Collection<Operation> failure + @NotNull Condition cond, + @NotNull Collection<Operation> success, + @NotNull Collection<Operation> failure ) { return metaStorageSvc.invoke(cond, success, failure); } /** - * @see MetaStorageService#getAndInvoke(Key, Condition, Operation, Operation) + * @see MetaStorageService#getAndInvoke(Condition, Operation, Operation) */ public @NotNull CompletableFuture<Entry> getAndInvoke( - @NotNull Key key, @NotNull Condition cond, @NotNull Operation success, @NotNull Operation failure ) { - return metaStorageSvc.getAndInvoke(key, cond, success, failure); + return metaStorageSvc.getAndInvoke(cond, success, failure); } /** @@ -429,9 +427,10 @@ import org.jetbrains.annotations.Nullable; try { return vaultMgr.putAll(entries.stream().collect( Collectors.toMap( - e -> ByteArray.fromString(e.getKey().toString()), - IgniteBiTuple::getValue)), - revision); + e -> ByteArray.fromString(e.getKey().toString()), + IgniteBiTuple::getValue) + ), + revision); } catch (IgniteInternalCheckedException e) { throw new IgniteInternalException("Couldn't put entries with considered revision.", e); diff --git a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java index 4e608eb..8fccaba 100644 --- a/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java +++ b/modules/runner/src/main/java/org/apache/ignite/internal/app/IgnitionImpl.java @@ -26,7 +26,6 @@ import java.util.List; import java.util.Map; import java.util.UUID; import java.util.concurrent.CompletableFuture; -import java.util.stream.Collectors; import org.apache.ignite.app.Ignite; import org.apache.ignite.app.Ignition; import org.apache.ignite.configuration.RootKey; @@ -202,9 +201,7 @@ public class IgnitionImpl implements Ignition { private static void ackBanner() { String ver = IgniteProperties.get(VER_KEY); - String banner = Arrays - .stream(BANNER) - .collect(Collectors.joining("\n")); + String banner = String.join("\n", BANNER); LOG.info(banner + '\n' + " ".repeat(22) + "Apache Ignite ver. " + ver + '\n'); } @@ -213,70 +210,79 @@ public class IgnitionImpl implements Ignition { private static MetaStorageService metaStorageServiceMock() { return new MetaStorageService() { @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> get(@NotNull Key key, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAll(Collection<Key> keys, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> put(@NotNull Key key, @NotNull byte[] value) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> getAndPut(@NotNull Key key, @NotNull byte[] value) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> putAll(@NotNull Map<Key, byte[]> vals) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndPutAll(@NotNull Map<Key, byte[]> vals) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> remove(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Entry> getAndRemove(@NotNull Key key) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> removeAll(@NotNull Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Map<Key, Entry>> getAndRemoveAll(@NotNull Collection<Key> keys) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, - @NotNull Collection<Operation> success, @NotNull Collection<Operation> failure) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + @NotNull Operation success, @NotNull Operation failure) { + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } - @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Key key, @NotNull Condition condition, - @NotNull Operation success, @NotNull Operation failure) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + @Override public @NotNull CompletableFuture<Boolean> invoke(@NotNull Condition condition, + @NotNull Collection<Operation> success, + @NotNull Collection<Operation> failure + ) { + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); + } + + @Override public @NotNull CompletableFuture<Entry> getAndInvoke(@NotNull Condition condition, + @NotNull Operation success, + @NotNull Operation failure + ) { + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo, long revUpperBound) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull Cursor<Entry> range(@NotNull Key keyFrom, @Nullable Key keyTo) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<IgniteUuid> watch(@Nullable Key keyFrom, @Nullable Key keyTo, @@ -295,11 +301,11 @@ public class IgnitionImpl implements Ignition { } @Override public @NotNull CompletableFuture<Void> stopWatch(@NotNull IgniteUuid id) { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } @Override public @NotNull CompletableFuture<Void> compact() { - throw new UnsupportedOperationException("Metastorage service is not implemented yet"); + throw new UnsupportedOperationException("Meta storage service is not implemented yet"); } }; } diff --git a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java index 012d57b..dd5375f 100644 --- a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java +++ b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java @@ -81,7 +81,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp private CompletableFuture<Long> tableCreationSubscriptionFut; /** Tables. */ - private Map<String, TableImpl> tables = new ConcurrentHashMap<>(); + private final Map<String, TableImpl> tables = new ConcurrentHashMap<>(); /* * @param configurationMgr Configuration manager. @@ -228,7 +228,7 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp configurationMgr.configurationRegistry().getConfiguration(TablesConfiguration.KEY) .tables().listen(ctx -> { Set<String> tablesToStart = ctx.newValue().namedListKeys() == null ? - Collections.EMPTY_SET : ctx.newValue().namedListKeys(); + Collections.emptySet() : ctx.newValue().namedListKeys(); tablesToStart.removeAll(ctx.oldValue().namedListKeys()); @@ -242,18 +242,19 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp UUID tblId = new UUID(revision, update); - var key = new Key(INTERNAL_PREFIX + tblId.toString()); + var key = new Key(INTERNAL_PREFIX + tblId); + futs.add(metaStorageMgr.invoke( - Conditions.key(key).value().eq(null), + Conditions.value(key).eq(null), Operations.put(key, tableView.name().getBytes(StandardCharsets.UTF_8)), Operations.noop()).thenCompose(res -> - res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()), new byte[0]) + res ? metaStorageMgr.put(new Key(INTERNAL_PREFIX + "assignment." + tblId), new byte[0]) .thenApply(v -> true) : CompletableFuture.completedFuture(false))); } Set<String> tablesToStop = ctx.oldValue().namedListKeys() == null ? - Collections.EMPTY_SET : ctx.oldValue().namedListKeys(); + Collections.emptySet() : ctx.oldValue().namedListKeys(); tablesToStop.removeAll(ctx.newValue().namedListKeys()); @@ -262,12 +263,12 @@ public class TableManager extends Producer<TableEvent, TableEventParameters> imp UUID tblId = t.internalTable().tableId(); - var key = new Key(INTERNAL_PREFIX + "assignment." + tblId.toString()); + var key = new Key(INTERNAL_PREFIX + "assignment." + tblId); futs.add(metaStorageMgr.invoke( - Conditions.key(key).value().ne(null), + Conditions.value(key).ne(null), Operations.remove(key), Operations.noop()).thenCompose(res -> - res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId.toString())) + res ? metaStorageMgr.remove(new Key(INTERNAL_PREFIX + tblId)) .thenApply(v -> true) : CompletableFuture.completedFuture(false))); }
