This is an automated email from the ASF dual-hosted git repository.
tkalkirill pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new 6936b4c978 IGNITE-19129 Remove parallel Watch processing (#1863)
6936b4c978 is described below
commit 6936b4c978525391ba9928c42c7cf5fa2ddba606
Author: Alexander Polovtcev <[email protected]>
AuthorDate: Thu Mar 30 15:32:12 2023 +0300
IGNITE-19129 Remove parallel Watch processing (#1863)
---
.../internal/catalog/CatalogServiceImpl.java | 8 --
.../internal/testframework/IgniteTestUtils.java | 135 +++++++++------------
.../distributionzones/DistributionZoneManager.java | 11 +-
...butionZoneManagerLogicalTopologyEventsTest.java | 2 -
.../DistributionZoneManagerScaleUpTest.java | 2 +-
.../DistributionZoneManagerWatchListenerTest.java | 2 +-
.../internal/metastorage/MetaStorageManager.java | 6 +-
.../ignite/internal/metastorage/WatchEvent.java | 6 +-
.../ignite/internal/metastorage/WatchListener.java | 6 -
.../impl/ItMetaStorageManagerImplTest.java | 24 +---
.../impl/ItMetaStorageMultipleNodesTest.java | 5 -
.../metastorage/impl/ItMetaStorageWatchTest.java | 25 ----
.../metastorage/impl/MetaStorageManagerImpl.java | 53 +++++---
.../server/OnRevisionAppliedCallback.java | 7 +-
.../ignite/internal/metastorage/server/Watch.java | 54 ++-------
.../metastorage/server/WatchProcessor.java | 132 +++++++++++++-------
.../server/AbstractKeyValueStorageTest.java | 23 +---
.../server/RocksDbKeyValueStorageTest.java | 12 +-
.../metastorage/server/WatchProcessorTest.java | 65 +++++-----
.../placementdriver/AssignmentsTracker.java | 5 -
.../internal/placementdriver/LeaseTracker.java | 5 -
.../ItDistributedConfigurationStorageTest.java | 2 +-
.../internal/test/WatchListenerInhibitor.java | 26 ++--
.../storage/DistributedConfigurationStorage.java | 16 +--
.../DistributedConfigurationCatchUpTest.java | 11 +-
.../internal/table/distributed/TableManager.java | 20 ---
26 files changed, 275 insertions(+), 388 deletions(-)
diff --git
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
index 2d9ce8c768..e3a5472a4e 100644
---
a/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
+++
b/modules/catalog/src/main/java/org/apache/ignite/internal/catalog/CatalogServiceImpl.java
@@ -152,19 +152,11 @@ public class CatalogServiceImpl implements
CatalogService, CatalogManager {
* MetaStorage event listener for catalog metadata updates.
*/
private static class CatalogEventListener implements WatchListener {
- /** {@inheritDoc} */
- @Override
- public String id() {
- return "catalog-history-watch";
- }
-
- /** {@inheritDoc} */
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
return completedFuture(null);
}
- /** {@inheritDoc} */
@Override
public void onError(Throwable e) {
LOG.warn("Unable to process catalog update event", e);
diff --git
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
index f353230f9a..5d23ff9b3b 100644
---
a/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
+++
b/modules/core/src/testFixtures/java/org/apache/ignite/internal/testframework/IgniteTestUtils.java
@@ -51,7 +51,6 @@ import org.apache.ignite.internal.thread.NamedThreadFactory;
import org.apache.ignite.internal.util.ExceptionUtils;
import org.apache.ignite.lang.IgniteInternalException;
import org.apache.ignite.lang.IgniteStringFormatter;
-import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;
import org.junit.jupiter.api.TestInfo;
@@ -76,7 +75,7 @@ public final class IgniteTestUtils {
assert fieldName != null;
try {
- Class<?> cls = obj instanceof Class ? (Class) obj : obj.getClass();
+ Class<?> cls = obj instanceof Class ? (Class<?>) obj :
obj.getClass();
Field field = cls.getDeclaredField(fieldName);
@@ -93,7 +92,7 @@ public final class IgniteTestUtils {
throw new IgniteInternalException("Modification of static
final field through reflection.");
}
- if (!field.isAccessible()) {
+ if (!field.canAccess(obj)) {
field.setAccessible(true);
}
@@ -112,13 +111,13 @@ public final class IgniteTestUtils {
* @param val New field value.
* @throws IgniteInternalException In case of error.
*/
- public static void setFieldValue(Object obj, Class cls, String fieldName,
Object val) throws IgniteInternalException {
+ public static void setFieldValue(Object obj, Class<?> cls, String
fieldName, Object val) throws IgniteInternalException {
assert fieldName != null;
try {
Field field = cls.getDeclaredField(fieldName);
- if (!field.isAccessible()) {
+ if (!field.canAccess(obj)) {
field.setAccessible(true);
}
@@ -150,14 +149,14 @@ public final class IgniteTestUtils {
}
/**
- * Returns field value.
+ * Finds a field in the given {@code target} object of the {@code
declaredClass} type.
*
- * @param target target object from which to get field value
({@code null} for static methods)
+ * @param target target object from which to get field ({@code
null} for static methods)
* @param declaredClass class on which the field is declared
* @param fieldName name of the field
- * @return field value
+ * @return field
*/
- public static Object getFieldValue(Object target, Class<?> declaredClass,
String fieldName) {
+ public static Field getField(@Nullable Object target, Class<?>
declaredClass, String fieldName) {
Field field;
try {
field = declaredClass.getDeclaredField(fieldName);
@@ -165,12 +164,24 @@ public final class IgniteTestUtils {
throw new IgniteInternalException("Did not find a field", e);
}
- if (!field.isAccessible()) {
+ if (!field.canAccess(target)) {
field.setAccessible(true);
}
+ return field;
+ }
+
+ /**
+ * Returns field value.
+ *
+ * @param target target object from which to get field value
({@code null} for static methods)
+ * @param declaredClass class on which the field is declared
+ * @param fieldName name of the field
+ * @return field value
+ */
+ public static Object getFieldValue(@Nullable Object target, Class<?>
declaredClass, String fieldName) {
try {
- return field.get(target);
+ return getField(target, declaredClass, fieldName).get(target);
} catch (IllegalAccessException e) {
throw new IgniteInternalException("Cannot get field value", e);
}
@@ -185,60 +196,30 @@ public final class IgniteTestUtils {
* @return Field value.
* @throws IgniteInternalException In case of error.
*/
- public static <T> T getFieldValue(Object obj, String... fieldNames) {
+ public static <T> T getFieldValue(@Nullable Object obj, String...
fieldNames) {
assert obj != null;
assert fieldNames != null;
assert fieldNames.length >= 1;
- try {
- for (String fieldName : fieldNames) {
- Class<?> cls = obj instanceof Class ? (Class) obj :
obj.getClass();
-
- try {
- obj = findField(cls, obj, fieldName);
- } catch (NoSuchFieldException e) {
- // Resolve inner class, if not an inner field.
- Class<?> innerCls = getInnerClass(cls, fieldName);
-
- if (innerCls == null) {
- throw new IgniteInternalException("Failed to get
object field [obj=" + obj
- + ", fieldNames=" +
Arrays.toString(fieldNames) + ']', e);
- }
+ for (String fieldName : fieldNames) {
+ Class<?> cls = obj instanceof Class ? (Class<?>) obj :
obj.getClass();
- obj = innerCls;
+ try {
+ obj = getFieldValue(obj, cls, fieldName);
+ } catch (IgniteInternalException e) {
+ // Resolve inner class, if not an inner field.
+ Class<?> innerCls = getInnerClass(cls, fieldName);
+
+ if (innerCls == null) {
+ throw new IgniteInternalException("Failed to get object
field [obj=" + obj
+ + ", fieldNames=" + Arrays.toString(fieldNames) +
']', e);
}
- }
- return (T) obj;
- } catch (IllegalAccessException e) {
- throw new IgniteInternalException("Failed to get object field
[obj=" + obj
- + ", fieldNames=" + Arrays.toString(fieldNames) + ']', e);
- }
- }
-
- /**
- * Get object field value via reflection.
- *
- * @param cls Class for searching.
- * @param obj Target object.
- * @param fieldName Field name for search.
- * @return Field from object if it was found.
- */
- private static Object findField(
- Class<?> cls,
- Object obj,
- String fieldName
- ) throws NoSuchFieldException, IllegalAccessException {
- // Resolve inner field.
- Field field = cls.getDeclaredField(fieldName);
-
- boolean accessible = field.isAccessible();
-
- if (!accessible) {
- field.setAccessible(true);
+ obj = innerCls;
+ }
}
- return field.get(obj);
+ return (T) obj;
}
/**
@@ -266,8 +247,8 @@ public final class IgniteTestUtils {
* @return Thrown throwable.
*/
public static Throwable assertThrowsWithCause(
- @NotNull RunnableX run,
- @NotNull Class<? extends Throwable> cls
+ RunnableX run,
+ Class<? extends Throwable> cls
) {
return assertThrowsWithCause(run, cls, null);
}
@@ -281,8 +262,8 @@ public final class IgniteTestUtils {
* @return Thrown throwable.
*/
public static Throwable assertThrowsWithCause(
- @NotNull RunnableX run,
- @NotNull Class<? extends Throwable> cls,
+ RunnableX run,
+ Class<? extends Throwable> cls,
@Nullable String msg
) {
try {
@@ -310,8 +291,8 @@ public final class IgniteTestUtils {
* @return {@code True} if one of the causing exception is an instance of
passed in classes, {@code false} otherwise.
*/
public static boolean hasCause(
- @NotNull Throwable t,
- @NotNull Class<?> cls,
+ Throwable t,
+ Class<?> cls,
@Nullable String messageFragment
) {
for (Throwable th = t; th != null; th = th.getCause()) {
@@ -349,10 +330,7 @@ public final class IgniteTestUtils {
* @param cls Cause classes to check.
* @return reference to the cause error if found, otherwise returns {@code
null}.
*/
- public static <T extends Throwable> T cause(
- @NotNull Throwable t,
- @NotNull Class<T> cls
- ) {
+ public static <T extends Throwable> @Nullable T cause(Throwable t,
Class<T> cls) {
return cause(t, cls, null);
}
@@ -367,9 +345,9 @@ public final class IgniteTestUtils {
* @param msg Message text that should be in cause (if {@code null},
message won't be checked).
* @return reference to the cause error if found, otherwise returns {@code
null}.
*/
- public static <T extends Throwable> T cause(
- @NotNull Throwable t,
- @NotNull Class<T> cls,
+ public static <T extends Throwable> @Nullable T cause(
+ Throwable t,
+ Class<T> cls,
@Nullable String msg
) {
for (Throwable th = t; th != null; th = th.getCause()) {
@@ -399,7 +377,7 @@ public final class IgniteTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static CompletableFuture<?> runAsync(final RunnableX task) {
+ public static CompletableFuture<?> runAsync(RunnableX task) {
return runAsync(task, "async-runnable-runner");
}
@@ -409,7 +387,7 @@ public final class IgniteTestUtils {
* @param task Runnable.
* @return Future with task result.
*/
- public static CompletableFuture<?> runAsync(final RunnableX task, String
threadName) {
+ public static CompletableFuture<?> runAsync(RunnableX task, String
threadName) {
return runAsync(() -> {
try {
task.run();
@@ -427,7 +405,7 @@ public final class IgniteTestUtils {
* @param task Callable.
* @return Future with task result.
*/
- public static <T> CompletableFuture<T> runAsync(final Callable<T> task) {
+ public static <T> CompletableFuture<T> runAsync(Callable<T> task) {
return runAsync(task, "async-callable-runner");
}
@@ -438,10 +416,10 @@ public final class IgniteTestUtils {
* @param threadName Thread name.
* @return Future with task result.
*/
- public static <T> CompletableFuture<T> runAsync(final Callable<T> task,
String threadName) {
- final NamedThreadFactory thrFactory = new
NamedThreadFactory(threadName, LOG);
+ public static <T> CompletableFuture<T> runAsync(Callable<T> task, String
threadName) {
+ NamedThreadFactory thrFactory = new NamedThreadFactory(threadName,
LOG);
- final CompletableFuture<T> fut = new CompletableFuture<T>();
+ CompletableFuture<T> fut = new CompletableFuture<T>();
thrFactory.newThread(() -> {
try {
@@ -556,7 +534,7 @@ public final class IgniteTestUtils {
* @param threadName Thread names.
* @return Future for the run. Future returns execution time in
milliseconds.
*/
- public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?>
call, int threadNum, final String threadName) {
+ public static CompletableFuture<Long> runMultiThreadedAsync(Callable<?>
call, int threadNum, String threadName) {
List<Callable<?>> calls = Collections.<Callable<?>>nCopies(threadNum,
call);
NamedThreadFactory threadFactory = new NamedThreadFactory(threadName,
LOG);
@@ -724,7 +702,6 @@ public final class IgniteTestUtils {
* <p>This method erases type of the exception in the thrown clause, so
checked exception could be thrown without need to wrap it with
* unchecked one or adding a similar throws clause to the upstream methods.
*/
- @SuppressWarnings("unchecked")
public static <E extends Throwable> void sneakyThrow(Throwable e) throws E
{
throw (E) e;
}
@@ -739,7 +716,7 @@ public final class IgniteTestUtils {
* @return A result of the stage.
*/
@SuppressWarnings("UnusedReturnValue")
- public static <T> T await(CompletionStage<T> stage, long timeout, TimeUnit
unit) {
+ public static <T> @Nullable T await(CompletionStage<T> stage, long
timeout, TimeUnit unit) {
try {
return stage.toCompletableFuture().get(timeout, unit);
} catch (Throwable e) {
@@ -771,7 +748,7 @@ public final class IgniteTestUtils {
* @return A result of the stage.
*/
@SuppressWarnings("UnusedReturnValue")
- public static <T> T await(CompletionStage<T> stage) {
+ public static <T> @Nullable T await(CompletionStage<T> stage) {
return await(stage, TIMEOUT_SEC, TimeUnit.SECONDS);
}
diff --git
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
index 9ad4d43253..08ceb1accb 100644
---
a/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
+++
b/modules/distribution-zones/src/main/java/org/apache/ignite/internal/distributionzones/DistributionZoneManager.java
@@ -122,8 +122,6 @@ public class DistributionZoneManager implements
IgniteComponent {
private static final String DISTRIBUTION_ZONE_MANAGER_POOL_NAME =
"dst-zones-scheduler";
- private static final String META_STORAGE_WATCH_ID = "dst-zones-watch";
-
/** Id of the default distribution zone. */
public static final int DEFAULT_ZONE_ID = 0;
@@ -893,12 +891,14 @@ public class DistributionZoneManager implements
IgniteComponent {
try {
vaultMgr.get(zonesLogicalTopologyKey())
-
.thenAcceptBoth(metaStorageManager.appliedRevision(META_STORAGE_WATCH_ID),
(vaultEntry, appliedRevision) -> {
+ .thenAccept(vaultEntry -> {
if (!busyLock.enterBusy()) {
throw new
IgniteInternalException(NODE_STOPPING_ERR, new NodeStoppingException());
}
try {
+ long appliedRevision =
metaStorageManager.appliedRevision();
+
if (vaultEntry != null && vaultEntry.value() !=
null) {
logicalTopology =
fromBytes(vaultEntry.value());
@@ -931,11 +931,6 @@ public class DistributionZoneManager implements
IgniteComponent {
private WatchListener createMetastorageListener() {
return new WatchListener() {
- @Override
- public String id() {
- return META_STORAGE_WATCH_ID;
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
index 82185aa6bd..7807aaa687 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerLogicalTopologyEventsTest.java
@@ -108,8 +108,6 @@ public class
DistributionZoneManagerLogicalTopologyEventsTest {
MetaStorageManager metaStorageManager = mock(MetaStorageManager.class);
-
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(0L));
-
cmgManager = mock(ClusterManagementGroupManager.class);
clusterStateStorage = new TestClusterStateStorage();
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
index 0bd9fa3878..827aff1c9b 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerScaleUpTest.java
@@ -1600,7 +1600,7 @@ public class DistributionZoneManagerScaleUpTest {
}
private void mockVaultAppliedRevision(long revision) {
-
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(revision));
+ when(metaStorageManager.appliedRevision()).thenReturn(revision);
}
private void watchListenerOnUpdate(Set<String> nodes, long rev) {
diff --git
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
index 84f1a807e6..7be638fec1 100644
---
a/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
+++
b/modules/distribution-zones/src/test/java/org/apache/ignite/internal/distributionzones/DistributionZoneManagerWatchListenerTest.java
@@ -333,7 +333,7 @@ public class DistributionZoneManagerWatchListenerTest
extends IgniteAbstractTest
}
private void mockVaultAppliedRevision(long revision) {
-
when(metaStorageManager.appliedRevision(any())).thenReturn(completedFuture(revision));
+ when(metaStorageManager.appliedRevision()).thenReturn(revision);
}
private void mockCmgLocalNodes() {
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
index f4fa35bb2c..d49a7d49ef 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/MetaStorageManager.java
@@ -40,11 +40,9 @@ import org.jetbrains.annotations.Nullable;
public interface MetaStorageManager extends IgniteComponent {
/**
* Returns the current <em>applied revision</em> of the Meta Storage, that
is, the most recent revision of updates that have been
- * processed by a particular Watch on this node.
- *
- * @param watchId ID of the watch that this revision corresponds to.
+ * processed by all Watches on this node.
*/
- CompletableFuture<Long> appliedRevision(String watchId);
+ long appliedRevision();
/**
* Retrieves an entry for the given key.
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
index 8dc49fcd98..33130cb77f 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchEvent.java
@@ -39,8 +39,8 @@ public class WatchEvent {
* @param entryEvts Events for entries corresponding to an update under
one revision.
* @param revision Revision of the updated entries.
*/
- public WatchEvent(List<EntryEvent> entryEvts, long revision) {
- this.entryEvts = entryEvts;
+ public WatchEvent(Collection<EntryEvent> entryEvts, long revision) {
+ this.entryEvts = List.copyOf(entryEvts);
this.revision = revision;
}
@@ -77,6 +77,8 @@ public class WatchEvent {
* @return Entry event.
*/
public EntryEvent entryEvent() {
+ assert single();
+
return entryEvts.get(0);
}
diff --git
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
index 665ac765a0..95696b971f 100644
---
a/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
+++
b/modules/metastorage-api/src/main/java/org/apache/ignite/internal/metastorage/WatchListener.java
@@ -23,12 +23,6 @@ import java.util.concurrent.CompletableFuture;
* The listener which receives and handles watch updates.
*/
public interface WatchListener {
- /**
- * Returns a unique identifier for this Watch. This identifier should
never change between node restarts and must uniquely identify
- * a Watch among all Watches on a local node.
- */
- String id();
-
/**
* The method will be called on each meta storage update.
*
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
index c37073a221..8952da63d9 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageManagerImplTest.java
@@ -27,6 +27,7 @@ import static
org.apache.ignite.internal.testframework.matchers.CompletableFutur
import static org.apache.ignite.utils.ClusterServiceTestUtils.clusterService;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.contains;
+import static org.hamcrest.Matchers.is;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.jupiter.api.Assertions.assertTrue;
import static org.mockito.Mockito.mock;
@@ -189,7 +190,7 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat(vaultManager.get(key1), willBe(nullValue()));
assertThat(vaultManager.get(key2), willBe(nullValue()));
- metaStorageManager.registerExactWatch(key1, new NoOpListener("test1"));
+ metaStorageManager.registerExactWatch(key1, new NoOpListener());
invokeFuture = metaStorageManager.invoke(
Conditions.exists(new ByteArray("foo")),
@@ -202,16 +203,15 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat(invokeFuture, willBe(true));
- assertTrue(waitForCondition(() ->
metaStorageManager.appliedRevision("test1").join() == 2, 10_000));
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
== 2, 10_000));
// Expect that only the watched key is persisted.
assertThat(vaultManager.get(key1).thenApply(VaultEntry::value),
willBe(value));
assertThat(vaultManager.get(key2), willBe(nullValue()));
- metaStorageManager.registerExactWatch(key2, new NoOpListener("test2"));
+ metaStorageManager.registerExactWatch(key2, new NoOpListener());
- assertThat(metaStorageManager.appliedRevision("test1"), willBe(2L));
- assertThat(metaStorageManager.appliedRevision("test2"), willBe(0L));
+ assertThat(metaStorageManager.appliedRevision(), is(2L));
byte[] newValue = "newValue".getBytes(StandardCharsets.UTF_8);
@@ -226,25 +226,13 @@ public class ItMetaStorageManagerImplTest extends
IgniteAbstractTest {
assertThat(invokeFuture, willBe(true));
- assertTrue(waitForCondition(() ->
metaStorageManager.appliedRevision("test1").join() == 3, 10_000));
- assertTrue(waitForCondition(() ->
metaStorageManager.appliedRevision("test2").join() == 3, 10_000));
+ assertTrue(waitForCondition(() -> metaStorageManager.appliedRevision()
== 3, 10_000));
assertThat(vaultManager.get(key1).thenApply(VaultEntry::value),
willBe(newValue));
assertThat(vaultManager.get(key2).thenApply(VaultEntry::value),
willBe(newValue));
}
private static class NoOpListener implements WatchListener {
- private final String id;
-
- NoOpListener(String id) {
- this.id = id;
- }
-
- @Override
- public String id() {
- return id;
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
return completedFuture(null);
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
index 9a90c3d8cf..c3d358dfea 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageMultipleNodesTest.java
@@ -249,11 +249,6 @@ public class ItMetaStorageMultipleNodesTest extends
IgniteAbstractTest {
var awaitFuture = new CompletableFuture<EntryEvent>();
secondNode.metaStorageManager.registerExactWatch(key, new
WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
// Skip the first update event, because it's not guaranteed to
arrive here (insert may have happened before the watch was
diff --git
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
index e5e758f862..3d3bf01197 100644
---
a/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
+++
b/modules/metastorage/src/integrationTest/java/org/apache/ignite/internal/metastorage/impl/ItMetaStorageWatchTest.java
@@ -205,11 +205,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
@Test
void testExactWatch() throws Exception {
testWatches((node, latch) ->
node.metaStorageManager.registerExactWatch(new ByteArray("foo"), new
WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().key(),
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -230,11 +225,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
@Test
void testPrefixWatch() throws Exception {
testWatches((node, latch) ->
node.metaStorageManager.registerPrefixWatch(new ByteArray("fo"), new
WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().key(),
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -259,11 +249,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
var endRange = new ByteArray("foz");
node.metaStorageManager.registerRangeWatch(startRange, endRange,
new WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().key(),
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -322,11 +307,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
for (Node node : nodes) {
node.metaStorageManager.registerExactWatch(new ByteArray("foo"),
new WatchListener() {
- @Override
- public String id() {
- return "test1";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().key(),
is("foo".getBytes(StandardCharsets.UTF_8)));
@@ -344,11 +324,6 @@ public class ItMetaStorageWatchTest extends
IgniteAbstractTest {
});
node.metaStorageManager.registerPrefixWatch(new ByteArray("ba"),
new WatchListener() {
- @Override
- public String id() {
- return "test2";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
List<String> keys = event.entryEvents().stream()
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
index d1fe91c36e..072348445e 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/impl/MetaStorageManagerImpl.java
@@ -21,13 +21,17 @@ import static
java.util.concurrent.CompletableFuture.completedFuture;
import static org.apache.ignite.internal.util.ByteUtils.bytesToLong;
import static org.apache.ignite.internal.util.ByteUtils.longToBytes;
import static org.apache.ignite.internal.util.IgniteUtils.cancelOrConsume;
+import static
org.apache.ignite.lang.ErrorGroups.MetaStorage.RESTORING_STORAGE_ERR;
import java.util.Collection;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.ExecutionException;
import java.util.concurrent.Flow.Publisher;
import java.util.concurrent.Flow.Subscriber;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import
org.apache.ignite.internal.cluster.management.ClusterManagementGroupManager;
import
org.apache.ignite.internal.cluster.management.topology.api.LogicalTopologyService;
@@ -41,6 +45,7 @@ import org.apache.ignite.internal.metastorage.dsl.Condition;
import org.apache.ignite.internal.metastorage.dsl.Iif;
import org.apache.ignite.internal.metastorage.dsl.Operation;
import org.apache.ignite.internal.metastorage.dsl.StatementResult;
+import org.apache.ignite.internal.metastorage.exceptions.MetaStorageException;
import org.apache.ignite.internal.metastorage.server.KeyValueStorage;
import
org.apache.ignite.internal.metastorage.server.raft.MetaStorageLearnerListener;
import org.apache.ignite.internal.metastorage.server.raft.MetaStorageListener;
@@ -53,6 +58,7 @@ import org.apache.ignite.internal.raft.RaftNodeId;
import org.apache.ignite.internal.raft.service.RaftGroupService;
import org.apache.ignite.internal.util.IgniteSpinBusyLock;
import org.apache.ignite.internal.util.IgniteUtils;
+import org.apache.ignite.internal.vault.VaultEntry;
import org.apache.ignite.internal.vault.VaultManager;
import org.apache.ignite.lang.ByteArray;
import org.apache.ignite.lang.NodeStoppingException;
@@ -77,7 +83,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/**
* Special key for the Vault where the applied revision is stored.
*/
- private static final String APPLIED_REV_PREFIX = "applied_revision_";
+ private static final ByteArray APPLIED_REV_KEY = new
ByteArray("applied_revision");
private final ClusterService clusterService;
@@ -103,6 +109,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/** Prevents double stopping of the component. */
private final AtomicBoolean isStopped = new AtomicBoolean();
+ private volatile long appliedRevision;
+
/**
* The constructor.
*
@@ -176,6 +184,8 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
public void start() {
storage.start();
+ appliedRevision = readRevisionFromVault();
+
cmgMgr.metaStorageNodes()
.thenCompose(metaStorageNodes -> {
if (!busyLock.enterBusy()) {
@@ -197,6 +207,16 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
});
}
+ private long readRevisionFromVault() {
+ try {
+ VaultEntry entry = vaultMgr.get(APPLIED_REV_KEY).get(10,
TimeUnit.SECONDS);
+
+ return entry == null ? 0L : bytesToLong(entry.value());
+ } catch (InterruptedException | ExecutionException | TimeoutException
e) {
+ throw new MetaStorageException(RESTORING_STORAGE_ERR, e);
+ }
+ }
+
@Override
public void stop() throws Exception {
if (!isStopped.compareAndSet(false, true)) {
@@ -214,32 +234,23 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
@Override
- public CompletableFuture<Long> appliedRevision(String watchId) {
- return vaultMgr.get(appliedRevisionKey(watchId))
- .thenApply(appliedRevision -> appliedRevision == null ? 0L :
bytesToLong(appliedRevision.value()));
- }
-
- private long appliedRevision(WatchListener lsnr) {
- return appliedRevision(lsnr.id()).join();
- }
-
- private static ByteArray appliedRevisionKey(String watchId) {
- return ByteArray.fromString(APPLIED_REV_PREFIX + watchId);
+ public long appliedRevision() {
+ return appliedRevision;
}
@Override
public void registerPrefixWatch(ByteArray key, WatchListener listener) {
- storage.watchPrefix(key.bytes(), appliedRevision(listener) + 1,
listener);
+ storage.watchPrefix(key.bytes(), appliedRevision() + 1, listener);
}
@Override
public void registerExactWatch(ByteArray key, WatchListener listener) {
- storage.watchExact(key.bytes(), appliedRevision(listener) + 1,
listener);
+ storage.watchExact(key.bytes(), appliedRevision() + 1, listener);
}
@Override
public void registerRangeWatch(ByteArray keyFrom, @Nullable ByteArray
keyTo, WatchListener listener) {
- storage.watchRange(keyFrom.bytes(), keyTo == null ? null :
keyTo.bytes(), appliedRevision(listener) + 1, listener);
+ storage.watchRange(keyFrom.bytes(), keyTo == null ? null :
keyTo.bytes(), appliedRevision() + 1, listener);
}
@Override
@@ -570,7 +581,7 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
/**
* Saves processed Meta Storage revision and corresponding entries to the
Vault.
*/
- private CompletableFuture<Void> saveUpdatedEntriesToVault(String watchId,
WatchEvent watchEvent) {
+ private CompletableFuture<Void> saveUpdatedEntriesToVault(WatchEvent
watchEvent) {
if (!busyLock.enterBusy()) {
LOG.info("Skipping applying MetaStorage revision because the node
is stopping");
@@ -578,17 +589,21 @@ public class MetaStorageManagerImpl implements
MetaStorageManager {
}
try {
+ CompletableFuture<Void> saveToVaultFuture;
+
if (watchEvent.entryEvents().isEmpty()) {
- return vaultMgr.put(appliedRevisionKey(watchId),
longToBytes(watchEvent.revision()));
+ saveToVaultFuture = vaultMgr.put(APPLIED_REV_KEY,
longToBytes(watchEvent.revision()));
} else {
Map<ByteArray, byte[]> batch =
IgniteUtils.newHashMap(watchEvent.entryEvents().size() + 1);
- batch.put(appliedRevisionKey(watchId),
longToBytes(watchEvent.revision()));
+ batch.put(APPLIED_REV_KEY, longToBytes(watchEvent.revision()));
watchEvent.entryEvents().forEach(e -> batch.put(new
ByteArray(e.newEntry().key()), e.newEntry().value()));
- return vaultMgr.putAll(batch);
+ saveToVaultFuture = vaultMgr.putAll(batch);
}
+
+ return saveToVaultFuture.thenRun(() -> appliedRevision =
watchEvent.revision());
} finally {
busyLock.leaveBusy();
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
index 155ffb5b69..0ce5b39072 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/OnRevisionAppliedCallback.java
@@ -26,11 +26,10 @@ import org.apache.ignite.internal.metastorage.WatchEvent;
@FunctionalInterface
public interface OnRevisionAppliedCallback {
/**
- * Notifies of completion of processing of Meta Storage watches for a
particular Watch and revision.
+ * Notifies of completion of processing of Meta Storage watches for a
particular revision.
*
- * @param watchId ID of the Watch that has finished processing the
revision.
- * @param watchEvent Event with modified Meta Storage entries processed by
the Watch.
+ * @param watchEvent Event with modified Meta Storage entries processed at
least one Watch.
* @return Future that represents the state of the execution of the
callback.
*/
- CompletableFuture<Void> onRevisionApplied(String watchId, WatchEvent
watchEvent);
+ CompletableFuture<Void> onRevisionApplied(WatchEvent watchEvent);
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
index 42c35ec9e3..33da407c6c 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/Watch.java
@@ -26,8 +26,8 @@ import org.apache.ignite.internal.metastorage.WatchListener;
* Subscription on updates of Meta Storage entries corresponding to a subset
of keys, starting from a given revision number.
*/
public class Watch {
- /** Current revision. */
- private volatile long targetRevision;
+ /** Minimum revision of entries that this Watch must be notified of. */
+ private final long startRevision;
/** Key predicate. */
private final Predicate<byte[]> predicate;
@@ -45,7 +45,7 @@ public class Watch {
public Watch(long startRevision, WatchListener listener, Predicate<byte[]>
predicate) {
this.predicate = predicate;
this.listener = listener;
- this.targetRevision = startRevision;
+ this.startRevision = startRevision;
}
/**
@@ -54,8 +54,8 @@ public class Watch {
* @param key Meta Storage key.
* @param revision Revision corresponding to the given {@code key}.
*/
- public boolean matches(byte[] key, long revision) {
- return revision >= targetRevision && predicate.test(key);
+ boolean matches(byte[] key, long revision) {
+ return revision >= startRevision && predicate.test(key);
}
/**
@@ -63,9 +63,7 @@ public class Watch {
*
* @see WatchListener#onUpdate
*/
- public CompletableFuture<Void> onUpdate(WatchEvent event) {
- targetRevision = event.revision() + 1;
-
+ CompletableFuture<Void> onUpdate(WatchEvent event) {
return listener.onUpdate(event);
}
@@ -74,56 +72,28 @@ public class Watch {
*
* @see WatchListener#onRevisionUpdated
*/
- public CompletableFuture<Void> onRevisionUpdated(long revision) {
- targetRevision = revision + 1;
-
+ CompletableFuture<Void> onRevisionUpdated(long revision) {
return listener.onRevisionUpdated(revision);
}
/**
* Callback that gets called if an error has occurred during the event
processing.
*/
- public void onError(Throwable e) {
+ void onError(Throwable e) {
listener.onError(e);
}
- /**
- * Returns the ID of the Watch.
- */
- public String id() {
- return listener.id();
- }
-
/**
* Returns the event listener.
*/
- public WatchListener listener() {
+ WatchListener listener() {
return listener;
}
/**
- * Returns the current Meta Storage revision this Watch is listening to.
+ * Returns the minimum Meta Storage revision this Watch is listening to.
*/
- public long targetRevision() {
- return targetRevision;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) {
- return true;
- }
- if (o == null || getClass() != o.getClass()) {
- return false;
- }
-
- Watch watch = (Watch) o;
-
- return id().equals(watch.id());
- }
-
- @Override
- public int hashCode() {
- return id().hashCode();
+ long startRevision() {
+ return startRevision;
}
}
diff --git
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
index 6189c12cee..aac4330e29 100644
---
a/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
+++
b/modules/metastorage/src/main/java/org/apache/ignite/internal/metastorage/server/WatchProcessor.java
@@ -17,15 +17,17 @@
package org.apache.ignite.internal.metastorage.server;
+import static java.util.concurrent.CompletableFuture.allOf;
import static java.util.concurrent.CompletableFuture.completedFuture;
-import static java.util.concurrent.CompletableFuture.failedFuture;
+import static java.util.concurrent.CompletableFuture.supplyAsync;
import java.util.ArrayList;
+import java.util.HashSet;
import java.util.List;
import java.util.OptionalLong;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
@@ -43,7 +45,8 @@ import org.apache.ignite.internal.util.IgniteUtils;
* Class for storing and notifying Meta Storage Watches.
*
* <p>Every Meta Storage update is processed by each registered Watch in
parallel, however notifications for a single Watch are
- * linearised (Watches are always notified of one event at a time and in
increasing order of revisions).
+ * linearised (Watches are always notified of one event at a time and in
increasing order of revisions). It is also guaranteed that
+ * Watches will not get notified of a new revision until all Watches have
finished processing a previous revision.
*/
public class WatchProcessor implements ManuallyCloseable {
/** Reads an entry from the storage using a given key and revision. */
@@ -55,7 +58,15 @@ public class WatchProcessor implements ManuallyCloseable {
private static final IgniteLogger LOG =
Loggers.forClass(WatchProcessor.class);
/** Map that contains Watches and corresponding Watch notification process
(represented as a CompletableFuture). */
- private final ConcurrentMap<Watch, CompletableFuture<Void>> watches = new
ConcurrentHashMap<>();
+ private final List<Watch> watches = new CopyOnWriteArrayList<>();
+
+ /**
+ * Future that represents the process of notifying registered Watches
about a Meta Storage revision.
+ *
+ * <p>Since Watches are notified concurrently, this future is used to
guarantee that no Watches get notified of a new revision,
+ * until all Watches have finished processing the previous revision.
+ */
+ private volatile CompletableFuture<Void> notificationFuture =
completedFuture(null);
private final EntryReader entryReader;
@@ -78,22 +89,20 @@ public class WatchProcessor implements ManuallyCloseable {
/** Adds a watch. */
public void addWatch(Watch watch) {
- assert !watches.containsKey(watch) : "Watch with id \"" + watch.id() +
"\" already exists";
-
- watches.put(watch, completedFuture(null));
+ watches.add(watch);
}
/** Removes a watch (identified by its listener). */
public void removeWatch(WatchListener listener) {
- watches.keySet().removeIf(watch -> watch.listener() == listener);
+ watches.removeIf(watch -> watch.listener() == listener);
}
/**
* Returns the minimal target revision of all registered watches.
*/
public OptionalLong minWatchRevision() {
- return watches.keySet().stream()
- .mapToLong(Watch::targetRevision)
+ return watches.stream()
+ .mapToLong(Watch::startRevision)
.min();
}
@@ -110,61 +119,96 @@ public class WatchProcessor implements ManuallyCloseable {
* Notifies registered watch about an update event.
*/
public void notifyWatches(List<Entry> updatedEntries) {
- watches.replaceAll((watch, watchOperation) ->
- watchOperation.thenComposeAsync(v -> notifyWatch(watch,
updatedEntries), watchExecutor));
+ notificationFuture = notificationFuture
+ .thenComposeAsync(v -> {
+ // Revision must be the same for all entries.
+ long newRevision = updatedEntries.get(0).revision();
+
+ // Notify all watches in parallel, then aggregate the
entries that they have processed.
+ CompletableFuture<List<EntryEvent>>[] notificationFutures
= watches.stream()
+ .map(watch -> notifyWatch(watch, updatedEntries,
newRevision))
+ .toArray(CompletableFuture[]::new);
+
+ return allOf(notificationFutures)
+ .thenComposeAsync(ignored ->
invokeOnRevisionCallback(notificationFutures, newRevision), watchExecutor);
+ }, watchExecutor);
}
- private CompletableFuture<Void> notifyWatch(Watch watch, List<Entry>
updatedEntries) {
- // Revision must be the same for all entries.
- long newRevision = updatedEntries.get(0).revision();
+ private CompletableFuture<List<EntryEvent>> notifyWatch(Watch watch,
List<Entry> updatedEntries, long revision) {
+ CompletableFuture<List<EntryEvent>> eventFuture = supplyAsync(() -> {
+ List<EntryEvent> entryEvents = List.of();
- List<EntryEvent> entryEvents = List.of();
+ for (Entry newEntry : updatedEntries) {
+ byte[] newKey = newEntry.key();
- for (Entry newEntry : updatedEntries) {
- byte[] newKey = newEntry.key();
+ assert newEntry.revision() == revision;
- assert newEntry.revision() == newRevision;
+ if (watch.matches(newKey, revision)) {
+ Entry oldEntry = entryReader.get(newKey, revision - 1);
- if (watch.matches(newKey, newRevision)) {
- Entry oldEntry = entryReader.get(newKey, newRevision - 1);
+ if (entryEvents.isEmpty()) {
+ entryEvents = new ArrayList<>();
+ }
- if (entryEvents.isEmpty()) {
- entryEvents = new ArrayList<>();
+ entryEvents.add(new EntryEvent(oldEntry, newEntry));
}
-
- entryEvents.add(new EntryEvent(oldEntry, newEntry));
}
- }
- var event = new WatchEvent(entryEvents, newRevision);
+ return entryEvents;
+ }, watchExecutor);
- CompletableFuture<Void> eventNotificationFuture;
+ return eventFuture
+ .thenCompose(entryEvents -> {
+ CompletableFuture<Void> eventNotificationFuture =
entryEvents.isEmpty()
+ ? watch.onRevisionUpdated(revision)
+ : watch.onUpdate(new WatchEvent(entryEvents,
revision));
- try {
- eventNotificationFuture = entryEvents.isEmpty()
- ? watch.onRevisionUpdated(newRevision)
- : watch.onUpdate(event);
- } catch (Throwable e) {
- eventNotificationFuture = failedFuture(e);
- }
-
- return eventNotificationFuture
+ return eventNotificationFuture.thenApply(v -> entryEvents);
+ })
.whenComplete((v, e) -> {
if (e != null) {
+ if (e instanceof CompletionException) {
+ e = e.getCause();
+ }
+
// TODO: IGNITE-14693 Implement Meta storage exception
handling logic.
- LOG.error("Error occurred when processing a watch
event {}, watch {} is going to be stopped", e, event, watch.id());
+ LOG.error("Error occurred when processing a watch
event", e);
watch.onError(e);
-
- watches.remove(watch);
}
- })
- .thenCompose(v ->
revisionCallback.onRevisionApplied(watch.id(), event));
+ });
+ }
+
+ private CompletableFuture<Void>
invokeOnRevisionCallback(CompletableFuture<List<EntryEvent>>[]
notificationFutures, long revision) {
+ try {
+ // Only notify about entries that have been accepted by at least
one Watch.
+ var acceptedEntries = new HashSet<EntryEvent>();
+
+ for (CompletableFuture<List<EntryEvent>> future :
notificationFutures) {
+ // This method must only be invoked when all passed futures
have been completed.
+ assert future.isDone();
+
+ acceptedEntries.addAll(future.join());
+ }
+
+ var event = new WatchEvent(acceptedEntries, revision);
+
+ return revisionCallback.onRevisionApplied(event)
+ .whenComplete((ignored, e) -> {
+ if (e != null) {
+ LOG.error("Error occurred when notifying watches",
e);
+ }
+ });
+ } catch (Throwable e) {
+ LOG.error("Error occurred when notifying watches", e);
+
+ throw e;
+ }
}
@Override
public void close() {
- watches.values().forEach(f -> f.cancel(true));
+ notificationFuture.cancel(true);
IgniteUtils.shutdownAndAwaitTermination(watchExecutor, 10,
TimeUnit.SECONDS);
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
index 4cd848247b..12a156eea9 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/AbstractKeyValueStorageTest.java
@@ -36,8 +36,8 @@ import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.mock;
+import static org.mockito.Mockito.never;
import static org.mockito.Mockito.timeout;
-import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
@@ -2087,16 +2087,11 @@ public abstract class AbstractKeyValueStorageTest {
long appliedRevision = storage.revision();
- storage.startWatches((revision, updatedEntries) ->
completedFuture(null));
+ storage.startWatches(event -> completedFuture(null));
CompletableFuture<byte[]> fut = new CompletableFuture<>();
storage.watchExact(key(0), appliedRevision + 1, new WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
fut.complete(event.entryEvent().newEntry().value());
@@ -2415,13 +2410,10 @@ public abstract class AbstractKeyValueStorageTest {
WatchListener mockListener2 = mock(WatchListener.class);
WatchListener mockListener3 = mock(WatchListener.class);
- when(mockListener1.id()).thenReturn("test1");
when(mockListener1.onUpdate(any())).thenReturn(completedFuture(null));
- when(mockListener2.id()).thenReturn("test2");
when(mockListener2.onUpdate(any())).thenReturn(completedFuture(null));
- when(mockListener3.id()).thenReturn("test3");
when(mockListener3.onUpdate(any())).thenReturn(completedFuture(null));
var exception = new IllegalStateException();
@@ -2434,7 +2426,7 @@ public abstract class AbstractKeyValueStorageTest {
OnRevisionAppliedCallback mockCallback =
mock(OnRevisionAppliedCallback.class);
- when(mockCallback.onRevisionApplied(any(),
any())).thenReturn(completedFuture(null));
+
when(mockCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
storage.startWatches(mockCallback);
@@ -2446,7 +2438,7 @@ public abstract class AbstractKeyValueStorageTest {
verify(mockListener3, timeout(10_000)).onUpdate(any());
- verify(mockCallback, times(2)).onRevisionApplied(any(), any());
+ verify(mockCallback, never()).onRevisionApplied(any());
}
private static void fill(KeyValueStorage storage, int keySuffix, int num) {
@@ -2489,11 +2481,6 @@ public abstract class AbstractKeyValueStorageTest {
var resultFuture = new CompletableFuture<Void>();
watchMethod.accept(new WatchListener() {
- @Override
- public String id() {
- return "test";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
try {
@@ -2519,7 +2506,7 @@ public abstract class AbstractKeyValueStorageTest {
}
});
- storage.startWatches((revision, updatedEntries) ->
completedFuture(null));
+ storage.startWatches(event -> completedFuture(null));
return resultFuture;
}
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
index 51cc7dac6d..bb421f2438 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/RocksDbKeyValueStorageTest.java
@@ -70,11 +70,6 @@ public class RocksDbKeyValueStorageTest extends
AbstractKeyValueStorageTest {
var latch = new CountDownLatch(2);
storage.watchExact("foo".getBytes(UTF_8), 1, new WatchListener() {
- @Override
- public String id() {
- return "test1";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().value(),
is("bar".getBytes(UTF_8)));
@@ -91,11 +86,6 @@ public class RocksDbKeyValueStorageTest extends
AbstractKeyValueStorageTest {
});
storage.watchExact("baz".getBytes(UTF_8), 1, new WatchListener() {
- @Override
- public String id() {
- return "test2";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assertThat(event.entryEvent().newEntry().value(),
is("quux".getBytes(UTF_8)));
@@ -111,7 +101,7 @@ public class RocksDbKeyValueStorageTest extends
AbstractKeyValueStorageTest {
}
});
- storage.startWatches((revision, updatedEntries) ->
CompletableFuture.completedFuture(null));
+ storage.startWatches(event -> CompletableFuture.completedFuture(null));
storage.restoreSnapshot(snapshotPath);
diff --git
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
index 9f43d28e53..1e64ecd803 100644
---
a/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
+++
b/modules/metastorage/src/test/java/org/apache/ignite/internal/metastorage/server/WatchProcessorTest.java
@@ -19,9 +19,11 @@ package org.apache.ignite.internal.metastorage.server;
import static java.nio.charset.StandardCharsets.UTF_8;
import static java.util.concurrent.CompletableFuture.completedFuture;
+import static org.hamcrest.MatcherAssert.assertThat;
+import static org.hamcrest.Matchers.containsInAnyOrder;
+import static org.hamcrest.Matchers.is;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyLong;
-import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
@@ -31,7 +33,6 @@ import static org.mockito.Mockito.when;
import java.util.Arrays;
import java.util.List;
-import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.internal.metastorage.Entry;
import org.apache.ignite.internal.metastorage.EntryEvent;
@@ -41,6 +42,7 @@ import org.apache.ignite.internal.metastorage.impl.EntryImpl;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
+import org.mockito.ArgumentCaptor;
import org.mockito.InOrder;
/**
@@ -53,7 +55,7 @@ public class WatchProcessorTest {
@BeforeEach
void setUp() {
- when(revisionCallback.onRevisionApplied(any(),
any())).thenReturn(completedFuture(null));
+
when(revisionCallback.onRevisionApplied(any())).thenReturn(completedFuture(null));
watchProcessor.setRevisionCallback(revisionCallback);
}
@@ -79,14 +81,23 @@ public class WatchProcessorTest {
watchProcessor.notifyWatches(List.of(entry1, entry2));
- verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry1), entry1)));
- verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
+ var entryEvent1 = new EntryEvent(oldEntry(entry1), entry1);
+ var entryEvent2 = new EntryEvent(oldEntry(entry2), entry2);
+
+ verify(listener1, timeout(1_000)).onUpdate(new
WatchEvent(entryEvent1));
+ verify(listener2, timeout(1_000)).onUpdate(new
WatchEvent(entryEvent2));
verify(listener1, never()).onRevisionUpdated(anyLong());
verify(listener2, never()).onRevisionUpdated(anyLong());
- // Revision callback should be called for every listener update.
- verify(revisionCallback,
timeout(1_000).times(2)).onRevisionApplied(any(), any());
+ var watchEventCaptor = ArgumentCaptor.forClass(WatchEvent.class);
+
+ verify(revisionCallback,
timeout(1_000)).onRevisionApplied(watchEventCaptor.capture());
+
+ WatchEvent event = watchEventCaptor.getValue();
+
+ assertThat(event.entryEvents(), containsInAnyOrder(entryEvent1,
entryEvent2));
+ assertThat(event.revision(), is(1L));
}
/**
@@ -105,20 +116,25 @@ public class WatchProcessorTest {
watchProcessor.notifyWatches(List.of(entry1));
- verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry1), entry1)));
+ var event = new WatchEvent(new EntryEvent(oldEntry(entry1), entry1));
+
+ verify(listener1, timeout(1_000)).onUpdate(event);
verify(listener2, timeout(1_000)).onRevisionUpdated(1);
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
+
watchProcessor.notifyWatches(List.of(entry2));
+ event = new WatchEvent(new EntryEvent(oldEntry(entry2), entry2));
+
verify(listener1, timeout(1_000)).onRevisionUpdated(2);
- verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
+ verify(listener2, timeout(1_000)).onUpdate(event);
- // Revision callback should be called for every listener update.
- verify(revisionCallback,
timeout(1_000).times(4)).onRevisionApplied(any(), any());
+ verify(revisionCallback, timeout(1_000)).onRevisionApplied(event);
}
/**
- * Tests a scenario that, when a watch throws an exception, it gets
disabled and other watches continue working.
+ * Tests a scenario that, when a watch throws an exception, watch
processing finishes with an error.
*/
@Test
void testWatchFailure() {
@@ -126,7 +142,6 @@ public class WatchProcessorTest {
WatchListener listener2 = mock(WatchListener.class);
- when(listener2.id()).thenReturn("error");
when(listener2.onUpdate(any())).thenThrow(new IllegalStateException());
watchProcessor.addWatch(new Watch(0, listener1, key ->
Arrays.equals(key, "foo".getBytes(UTF_8))));
@@ -141,19 +156,7 @@ public class WatchProcessorTest {
verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
verify(listener2,
timeout(1_000)).onError(any(IllegalStateException.class));
- var entry3 = new EntryImpl("bar".getBytes(UTF_8), null, 2, 0);
-
- watchProcessor.notifyWatches(List.of(entry3));
-
- clearInvocations(listener1, listener2, revisionCallback);
-
- verify(listener1, timeout(1_000)).onRevisionUpdated(2);
-
- verify(listener2, never()).onUpdate(any());
- verify(listener2, never()).onError(any());
- verify(listener2, never()).onRevisionUpdated(anyLong());
-
- verify(revisionCallback, timeout(1_000)).onRevisionApplied(any(),
any());
+ verify(revisionCallback, never()).onRevisionApplied(any());
}
/**
@@ -166,8 +169,6 @@ public class WatchProcessorTest {
WatchListener listener2 = mock(WatchListener.class);
- when(listener2.id()).thenReturn("blocking");
-
var blockingFuture = new CompletableFuture<Void>();
when(listener2.onUpdate(any()))
@@ -191,11 +192,13 @@ public class WatchProcessorTest {
watchProcessor.notifyWatches(List.of(entry3, entry4));
- verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry3), entry3)));
+ verify(listener1, never()).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry3), entry3)));
verify(listener2, never()).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry4), entry4)));
blockingFuture.complete(null);
+ verify(listener1, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry3), entry3)));
+
InOrder inOrder = inOrder(listener2);
inOrder.verify(listener2, timeout(1_000)).onUpdate(new WatchEvent(new
EntryEvent(oldEntry(entry2), entry2)));
@@ -205,10 +208,6 @@ public class WatchProcessorTest {
private static WatchListener mockListener() {
var listener = mock(WatchListener.class);
- String id = UUID.randomUUID().toString();
-
- when(listener.id()).thenReturn(id);
-
when(listener.onUpdate(any())).thenReturn(completedFuture(null));
when(listener.onRevisionUpdated(anyLong())).thenReturn(completedFuture(null));
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
index c990e11817..71ce16672b 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/AssignmentsTracker.java
@@ -193,11 +193,6 @@ public class AssignmentsTracker {
* Meta storage assignments watch.
*/
private class AssignmentsListener implements WatchListener {
- @Override
- public String id() {
- return STABLE_ASSIGNMENTS_PREFIX + "watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
assert !event.entryEvent().newEntry().empty() : "New assignments
are empty";
diff --git
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
index ac937f3f83..4933f73593 100644
---
a/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
+++
b/modules/placement-driver/src/main/java/org/apache/ignite/internal/placementdriver/LeaseTracker.java
@@ -124,11 +124,6 @@ public class LeaseTracker {
* Listen lease holder updates.
*/
private class UpdateListener implements WatchListener {
- @Override
- public String id() {
- return PLACEMENTDRIVER_PREFIX + "watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent event) {
for (EntryEvent entry : event.entryEvents()) {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
index 4db3c7f5d2..3f9eefb6e3 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/configuration/storage/ItDistributedConfigurationStorageTest.java
@@ -203,7 +203,7 @@ public class ItDistributedConfigurationStorageTest {
assertThat(node.cfgStorage.writeConfigurationRevision(0, 1),
willCompleteSuccessfully());
assertTrue(waitForCondition(
- () ->
node.metaStorageManager.appliedRevision(DistributedConfigurationStorage.WATCH_ID).join()
!= 0,
+ () -> node.metaStorageManager.appliedRevision() != 0,
3000
));
} finally {
diff --git
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
index d58cd800de..dc3f880ee1 100644
---
a/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
+++
b/modules/runner/src/integrationTest/java/org/apache/ignite/internal/test/WatchListenerInhibitor.java
@@ -17,14 +17,14 @@
package org.apache.ignite.internal.test;
+import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getField;
import static
org.apache.ignite.internal.testframework.IgniteTestUtils.getFieldValue;
-import java.util.Map;
+import java.lang.reflect.Field;
import java.util.concurrent.CompletableFuture;
import org.apache.ignite.Ignite;
import org.apache.ignite.internal.app.IgniteImpl;
import org.apache.ignite.internal.metastorage.impl.MetaStorageManagerImpl;
-import org.apache.ignite.internal.metastorage.server.Watch;
import org.apache.ignite.internal.metastorage.server.WatchProcessor;
import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValueStorage;
@@ -32,8 +32,9 @@ import
org.apache.ignite.internal.metastorage.server.persistence.RocksDbKeyValue
* Class for blocking Watch processing on a given Ignite node.
*/
public class WatchListenerInhibitor {
- /** "watches" field captured from the {@link RocksDbKeyValueStorage}
instance. */
- private final Map<Watch, CompletableFuture<Void>> watches;
+ private final WatchProcessor watchProcessor;
+
+ private final Field notificationFutureField;
/** Future used to block the watch notification thread. */
private final CompletableFuture<Void> inhibitFuture = new
CompletableFuture<>();
@@ -52,20 +53,25 @@ public class WatchListenerInhibitor {
var watchProcessor = (WatchProcessor) getFieldValue(storage,
RocksDbKeyValueStorage.class, "watchProcessor");
- var watches = (Map<Watch, CompletableFuture<Void>>)
getFieldValue(watchProcessor, WatchProcessor.class, "watches");
-
- return new WatchListenerInhibitor(watches);
+ return new WatchListenerInhibitor(watchProcessor);
}
- private WatchListenerInhibitor(Map<Watch, CompletableFuture<Void>>
watches) {
- this.watches = watches;
+ private WatchListenerInhibitor(WatchProcessor watchProcessor) {
+ this.watchProcessor = watchProcessor;
+ this.notificationFutureField = getField(watchProcessor,
WatchProcessor.class, "notificationFuture");
}
/**
* Starts inhibiting events.
*/
public void startInhibit() {
- watches.replaceAll((watch, watchOperation) ->
watchOperation.thenCompose(v -> inhibitFuture));
+ try {
+ CompletableFuture<Void> notificationFuture =
(CompletableFuture<Void>) notificationFutureField.get(watchProcessor);
+
+ notificationFutureField.set(watchProcessor,
notificationFuture.thenCompose(v -> inhibitFuture));
+ } catch (IllegalAccessException e) {
+ throw new RuntimeException(e);
+ }
}
/**
diff --git
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
index 6961512d8a..f0dd8e74c1 100644
---
a/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
+++
b/modules/runner/src/main/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationStorage.java
@@ -65,8 +65,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
/** Prefix added to configuration keys to distinguish them in the meta
storage. Must end with a dot. */
private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
- public static final String WATCH_ID = DISTRIBUTED_PREFIX + "watch";
-
/**
* Key for CAS-ing configuration keys to meta storage.
*/
@@ -208,9 +206,12 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
@Override
public CompletableFuture<Data> readDataOnRecovery() throws
StorageException {
- CompletableFuture<Data> future =
metaStorageMgr.appliedRevision(WATCH_ID)
- .thenCombine(vaultMgr.get(CONFIGURATION_REVISIONS_KEY),
DistributedConfigurationStorage::resolveRevision)
- .thenApplyAsync(this::readDataOnRecovery0, threadPool);
+ CompletableFuture<Data> future =
vaultMgr.get(CONFIGURATION_REVISIONS_KEY)
+ .thenApplyAsync(entry -> {
+ long revision =
resolveRevision(metaStorageMgr.appliedRevision(), entry);
+
+ return readDataOnRecovery0(revision);
+ }, threadPool);
return registerFuture(future);
}
@@ -311,11 +312,6 @@ public class DistributedConfigurationStorage implements
ConfigurationStorage {
// TODO: registerPrefixWatch could throw OperationTimeoutException and
CompactedException and we should
// TODO: properly handle such cases
https://issues.apache.org/jira/browse/IGNITE-14604
metaStorageMgr.registerPrefixWatch(DST_KEYS_START_RANGE, new
WatchListener() {
- @Override
- public String id() {
- return WATCH_ID;
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent events) {
Map<String, Serializable> data =
IgniteUtils.newHashMap(events.entryEvents().size() - 1);
diff --git
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
index 675fc85695..776fa792be 100644
---
a/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
+++
b/modules/runner/src/test/java/org/apache/ignite/internal/configuration/storage/DistributedConfigurationCatchUpTest.java
@@ -87,8 +87,7 @@ public class DistributedConfigurationCatchUpTest {
}
/**
- * Tests that distributed configuration storage correctly picks up latest
configuration MetaStorage revision
- * during recovery process.
+ * Tests that distributed configuration storage correctly picks up latest
configuration MetaStorage revision during recovery process.
*
* @throws Exception If failed.
*/
@@ -100,8 +99,6 @@ public class DistributedConfigurationCatchUpTest {
MetaStorageMockWrapper wrapper = new MetaStorageMockWrapper();
-
when(wrapper.mock.appliedRevision(DistributedConfigurationStorage.WATCH_ID)).thenReturn(completedFuture(0L));
-
DistributedConfigurationStorage storage = storage(wrapper);
try {
@@ -130,7 +127,7 @@ public class DistributedConfigurationCatchUpTest {
vaultManager.put(MetaStorageMockWrapper.TEST_KEY, new byte[]{4, 1, 2,
3, 4}).get();
// This emulates a change in MetaStorage that is not related to the
configuration.
-
when(wrapper.mock.appliedRevision(DistributedConfigurationStorage.WATCH_ID)).thenReturn(completedFuture(2L));
+ when(wrapper.mock.appliedRevision()).thenReturn(2L);
storage = storage(wrapper);
@@ -163,8 +160,8 @@ public class DistributedConfigurationCatchUpTest {
private static final String DISTRIBUTED_PREFIX = "dst-cfg.";
/**
- * This and previous field are copy-pasted intentionally, so in case
if something changes,
- * this test should fail and be reviewed and re-written.
+ * This and previous field are copy-pasted intentionally, so in case
if something changes, this test should fail and be reviewed and
+ * re-written.
*/
private static final ByteArray MASTER_KEY = new
ByteArray(DISTRIBUTED_PREFIX + "$master$key");
diff --git
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
index 03136e6def..7848b4931a 100644
---
a/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
+++
b/modules/table/src/main/java/org/apache/ignite/internal/table/distributed/TableManager.java
@@ -1837,11 +1837,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private WatchListener createDistributionZonesDataNodesListener() {
return new WatchListener() {
- @Override
- public String id() {
- return "dst-zones-data-nodes-watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {
@@ -1921,11 +1916,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private WatchListener createPendingAssignmentsRebalanceListener() {
return new WatchListener() {
- @Override
- public String id() {
- return "pending-assignments-watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {
@@ -2156,11 +2146,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private WatchListener createStableAssignmentsRebalanceListener() {
return new WatchListener() {
- @Override
- public String id() {
- return "stable-assignments-watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {
@@ -2188,11 +2173,6 @@ public class TableManager extends Producer<TableEvent,
TableEventParameters> imp
*/
private WatchListener createAssignmentsSwitchRebalanceListener() {
return new WatchListener() {
- @Override
- public String id() {
- return "assignments-switch-watch";
- }
-
@Override
public CompletableFuture<Void> onUpdate(WatchEvent evt) {
if (!busyLock.enterBusy()) {