This is an automated email from the ASF dual-hosted git repository.
ibessonov pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/ignite-3.git
The following commit(s) were added to refs/heads/main by this push:
new d9f30cf IGNITE-16361 Implemented predictable "nested" listener
behavior in configuration notifications (#589)
d9f30cf is described below
commit d9f30cf81a32893fd95b3a22bbb35f392fa1b45c
Author: Kirill Tkalenko <[email protected]>
AuthorDate: Fri Jan 28 13:50:55 2022 +0300
IGNITE-16361 Implemented predictable "nested" listener behavior in
configuration notifications (#589)
---
.../configuration/ConfigurationProperty.java | 9 +-
.../configuration/NamedConfigurationTree.java | 12 +-
.../configuration/ConfigurationChanger.java | 29 +++-
.../configuration/ConfigurationListenerHolder.java | 129 ++++++++++++++
.../internal/configuration/ConfigurationNode.java | 21 ++-
.../configuration/ConfigurationRegistry.java | 49 ++++--
.../configuration/DynamicConfigurationChanger.java | 9 +-
.../configuration/NamedListConfiguration.java | 29 ++--
.../ConfigurationNotificationContext.java | 9 +-
.../ConfigurationNotificationUtils.java | 46 +++--
.../notifications/ConfigurationNotifier.java | 186 ++++++++++++---------
.../ConfigurationListenerHolderTest.java | 122 ++++++++++++++
.../configuration/TestConfigurationChanger.java | 2 +-
.../notifications/ConfigurationListenerTest.java | 82 ++++++++-
.../testframework/ConfigurationExtension.java | 15 +-
.../ignite/internal/util/CollectionUtils.java | 113 +++++++++++--
.../ignite/internal/util/CollectionUtilsTest.java | 68 ++++++--
17 files changed, 733 insertions(+), 197 deletions(-)
diff --git
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationProperty.java
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationProperty.java
index 0556aa2..b010249 100644
---
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationProperty.java
+++
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/ConfigurationProperty.java
@@ -27,21 +27,20 @@ import
org.apache.ignite.configuration.notifications.ConfigurationListener;
public interface ConfigurationProperty<VIEWT> {
/**
* Get key of this node.
- *
- * @return Key.
*/
String key();
/**
* Get value of this property.
- *
- * @return Value of this property.
*/
VIEWT value();
/**
* Adds configuration values listener.
*
+ * <p>NOTE: If this method is called from another listener, then it is
guaranteed to be called starting from the next configuration
+ * update only.
+ *
* @param listener Listener.
*/
void listen(ConfigurationListener<VIEWT> listener);
@@ -49,6 +48,8 @@ public interface ConfigurationProperty<VIEWT> {
/**
* Removes configuration values listener.
*
+ * <p>NOTE: Unpredictable behavior if the method is called inside other
listeners.
+ *
* @param listener Listener.
*/
void stopListen(ConfigurationListener<VIEWT> listener);
diff --git
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/NamedConfigurationTree.java
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/NamedConfigurationTree.java
index b0a8820..5806bf3 100644
---
a/modules/configuration-api/src/main/java/org/apache/ignite/configuration/NamedConfigurationTree.java
+++
b/modules/configuration-api/src/main/java/org/apache/ignite/configuration/NamedConfigurationTree.java
@@ -23,8 +23,8 @@ import org.jetbrains.annotations.Nullable;
/**
* Configuration tree representing arbitrary set of named underlying
configuration tree of the same type.
*
- * @param <T> Type of the underlying configuration tree.
- * @param <VIEWT> Value type of the underlying node.
+ * @param <T> Type of the underlying configuration tree.
+ * @param <VIEWT> Value type of the underlying node.
* @param <CHANGET> Type of the object that changes underlying nodes values.
*/
public interface NamedConfigurationTree<T extends
ConfigurationProperty<VIEWT>, VIEWT, CHANGET extends VIEWT>
@@ -33,13 +33,15 @@ public interface NamedConfigurationTree<T extends
ConfigurationProperty<VIEWT>,
* Get named configuration by name.
*
* @param name Name.
- * @return Configuration.
*/
@Nullable T get(String name);
/**
* Add named-list-specific configuration values listener.
*
+ * <p>NOTE: If this method is called from another listener, then it is
guaranteed to be called starting from the next configuration
+ * update only.
+ *
* @param listener Listener.
*/
void listenElements(ConfigurationNamedListListener<VIEWT> listener);
@@ -47,6 +49,8 @@ public interface NamedConfigurationTree<T extends
ConfigurationProperty<VIEWT>,
/**
* Removes named-list-specific configuration values listener.
*
+ * <p>NOTE: Unpredictable behavior if the method is called inside other
listeners.
+ *
* @param listener Listener.
*/
void stopListenElements(ConfigurationNamedListListener<VIEWT> listener);
@@ -56,8 +60,6 @@ public interface NamedConfigurationTree<T extends
ConfigurationProperty<VIEWT>,
* its nested configurations.
*
* <p>NOTE: {@link ConfigurationListenOnlyException} will be thrown when
trying to get/update the configuration values.
- *
- * @return Placeholder to add listeners for any configuration.
*/
T any();
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
index e567737..83f1d93 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationChanger.java
@@ -51,6 +51,7 @@ import java.util.concurrent.ExecutionException;
import java.util.concurrent.ForkJoinPool;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationChangeException;
import org.apache.ignite.configuration.RootKey;
@@ -97,6 +98,9 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
/** Storage trees. */
private volatile StorageRoots storageRoots;
+ /** Configuration listener notification counter, must be incremented
before each use of {@link #notificator}. */
+ private final AtomicLong notificationListenerCnt = new AtomicLong();
+
/**
* Closure interface to be used by the configuration changer. An instance
of this closure is passed into the constructor and invoked
* every time when there's an update from any of the storages.
@@ -108,10 +112,11 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
*
* @param oldRoot Old roots values. All these roots always belong to a
single storage.
* @param newRoot New values for the same roots as in {@code oldRoot}.
- * @param storageRevision Revision of the storage.
+ * @param storageRevision Configuration revision of the storage.
+ * @param notificationNumber Configuration listener notification
number.
* @return Future that must signify when processing is completed.
Exceptional completion is not expected.
*/
- CompletableFuture<Void> notify(@Nullable SuperRoot oldRoot, SuperRoot
newRoot, long storageRevision);
+ CompletableFuture<Void> notify(@Nullable SuperRoot oldRoot, SuperRoot
newRoot, long storageRevision, long notificationNumber);
}
/**
@@ -130,7 +135,7 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
/**
* Constructor.
*
- * @param roots Forest.
+ * @param roots Forest.
* @param version Version associated with the currently known storage
state.
*/
private StorageRoots(SuperRoot roots, long version) {
@@ -143,9 +148,9 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
* Constructor.
*
* @param notificator Closure to execute when update from the storage is
received.
- * @param rootKeys Configuration root keys.
- * @param validators Validators.
- * @param storage Configuration storage.
+ * @param rootKeys Configuration root keys.
+ * @param validators Validators.
+ * @param storage Configuration storage.
* @throws IllegalArgumentException If the configuration type of the root
keys is not equal to the storage type.
*/
public ConfigurationChanger(
@@ -225,7 +230,7 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
* Initializes the configuration storage - reads data and sets default
values for missing configuration properties.
*
* @throws ConfigurationValidationException If configuration validation
failed.
- * @throws ConfigurationChangeException If configuration framework
failed to add default values and save them to storage.
+ * @throws ConfigurationChangeException If configuration framework failed
to add default values and save them to storage.
*/
public void initializeDefaults() throws ConfigurationValidationException,
ConfigurationChangeException {
try {
@@ -555,7 +560,7 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
return CompletableFuture.completedFuture(null);
} else {
- return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId)
+ return notificator.notify(oldSuperRoot, newSuperRoot, newChangeId,
notificationListenerCnt.incrementAndGet())
.whenComplete((v, t) -> {
if (t == null) {
oldStorageRoots.changeFuture.complete(null);
@@ -574,6 +579,12 @@ public abstract class ConfigurationChanger implements
DynamicConfigurationChange
CompletableFuture<Void> notifyCurrentConfigurationListeners() {
StorageRoots storageRoots = this.storageRoots;
- return notificator.notify(null, storageRoots.roots,
storageRoots.version);
+ return notificator.notify(null, storageRoots.roots,
storageRoots.version, notificationListenerCnt.incrementAndGet());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long notificationCount() {
+ return notificationListenerCnt.get();
}
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolder.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolder.java
new file mode 100644
index 0000000..ca5c8e6
--- /dev/null
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolder.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.NoSuchElementException;
+import java.util.concurrent.CopyOnWriteArrayList;
+import org.jetbrains.annotations.Nullable;
+
+/**
+ * Holder (thread safe) for configuration change listeners.
+ */
+class ConfigurationListenerHolder<L> {
+ private final List<Container<L>> containers = new CopyOnWriteArrayList<>();
+
+ /**
+ * Adds a listener.
+ *
+ * @param listener Configuration change listener.
+ * @param notificationNumber Configuration notification listener number
after which the listener will be called.
+ * @see ConfigurationListenerHolder#listeners
+ */
+ void addListener(L listener, long notificationNumber) {
+ containers.add(new Container<>(listener, notificationNumber + 1));
+ }
+
+ /**
+ * Removes the listener.
+ *
+ * <p>NOTE: This method introduces unpredictable behavior at the moment,
because the final behavior of this method is unclear.
+ * Should the listener be removed immediately or only on the next
notification? We'll fix it later if there's a problem.
+ *
+ * @param listener Configuration change listener.
+ */
+ void removeListener(L listener) {
+ containers.remove(new Container<>(listener, -1) {
+ /** {@inheritDoc} */
+ @Override
+ public boolean equals(Object obj) {
+ return listener == ((Container<L>) obj).listener;
+ }
+ });
+ }
+
+ /**
+ * Returns an iterator of the listeners for the {@code notificationNumber}
(were added for and before it).
+ *
+ * <p>NOTE: {@link Iterator#remove} - not supported.
+ *
+ * @param notificationNumber Configuration notification listener number.
+ * @see ConfigurationListenerHolder#addListener
+ */
+ Iterator<L> listeners(long notificationNumber) {
+ Iterator<Container<L>> it = containers.iterator();
+
+ return new Iterator<L>() {
+ @Nullable L curr = advance();
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ return curr != null;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public L next() {
+ L next = curr;
+
+ if (next == null) {
+ throw new NoSuchElementException();
+ }
+
+ curr = advance();
+
+ return next;
+ }
+
+ @Nullable L advance() {
+ while (it.hasNext()) {
+ Container<L> next = it.next();
+
+ if (next.notificationNumber <= notificationNumber) {
+ return next.listener;
+ }
+ }
+
+ return null;
+ }
+ };
+ }
+
+ /**
+ * Removes all listeners.
+ */
+ void clear() {
+ containers.clear();
+ }
+
+ /**
+ * Configuration change listener container.
+ */
+ private static class Container<L> {
+ final L listener;
+
+ final long notificationNumber;
+
+ Container(L listener, long storageRevision) {
+ this.listener = listener;
+ this.notificationNumber = storageRevision;
+ }
+ }
+}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationNode.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationNode.java
index d07a7bd..8f7012f 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationNode.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationNode.java
@@ -17,15 +17,12 @@
package org.apache.ignite.internal.configuration;
-import static java.util.Collections.unmodifiableCollection;
-
import java.io.Serializable;
import java.util.ArrayList;
-import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.NoSuchElementException;
import java.util.Objects;
-import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.ignite.configuration.ConfigurationListenOnlyException;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.RootKey;
@@ -45,7 +42,7 @@ import org.jetbrains.annotations.Nullable;
*/
public abstract class ConfigurationNode<VIEWT> implements
ConfigurationProperty<VIEWT> {
/** Listeners of property update. */
- private final List<ConfigurationListener<VIEWT>> updateListeners = new
CopyOnWriteArrayList<>();
+ private final ConfigurationListenerHolder<ConfigurationListener<VIEWT>>
updateListeners = new ConfigurationListenerHolder<>();
/** Full path to the current node. */
protected final List<String> keys;
@@ -104,22 +101,24 @@ public abstract class ConfigurationNode<VIEWT> implements
ConfigurationProperty<
/** {@inheritDoc} */
@Override
public void listen(ConfigurationListener<VIEWT> listener) {
- updateListeners.add(listener);
+ updateListeners.addListener(listener, changer.notificationCount());
}
/** {@inheritDoc} */
@Override
public void stopListen(ConfigurationListener<VIEWT> listener) {
- updateListeners.remove(listener);
+ updateListeners.removeListener(listener);
}
/**
- * Returns list of update listeners.
+ * Returns an iterator of the listeners for the {@code notificationNumber}
(were added for and before it).
+ *
+ * <p>NOTE: {@link Iterator#remove} - not supported.
*
- * @return List of update listeners.
+ * @param notificationNumber Configuration notification listener number.
*/
- public Collection<ConfigurationListener<VIEWT>> listeners() {
- return unmodifiableCollection(updateListeners);
+ public Iterator<ConfigurationListener<VIEWT>> listeners(long
notificationNumber) {
+ return updateListeners.listeners(notificationNumber);
}
/**
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
index 48cbf89..f85f9aa 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/ConfigurationRegistry.java
@@ -39,11 +39,11 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.Function;
import org.apache.ignite.configuration.ConfigurationTree;
import org.apache.ignite.configuration.RootKey;
@@ -99,7 +99,8 @@ public class ConfigurationRegistry implements IgniteComponent
{
private final ConfigurationAsmGenerator cgen = new
ConfigurationAsmGenerator();
/** Configuration storage revision change listeners. */
- private final List<ConfigurationStorageRevisionListener>
storageRevisionListeners = new CopyOnWriteArrayList<>();
+ private final
ConfigurationListenerHolder<ConfigurationStorageRevisionListener>
storageRevisionListeners =
+ new ConfigurationListenerHolder<>();
/**
* Constructor.
@@ -275,12 +276,14 @@ public class ConfigurationRegistry implements
IgniteComponent {
* @param oldSuperRoot Old roots values. All these roots always belong to
a single storage.
* @param newSuperRoot New values for the same roots as in {@code oldRoot}.
* @param storageRevision Revision of the storage.
+ * @param notificationNumber Current configuration listener notification
number.
* @return Future that must signify when processing is completed.
*/
private CompletableFuture<Void> notificator(
@Nullable SuperRoot oldSuperRoot,
SuperRoot newSuperRoot,
- long storageRevision
+ long storageRevision,
+ long notificationNumber
) {
Collection<CompletableFuture<?>> futures = new ArrayList<>();
@@ -302,13 +305,13 @@ public class ConfigurationRegistry implements
IgniteComponent {
oldRoot = null;
}
- futures.addAll(notifyListeners(oldRoot, newRoot, config,
storageRevision));
+ futures.addAll(notifyListeners(oldRoot, newRoot, config,
storageRevision, notificationNumber));
return null;
}
}, true);
- futures.addAll(notifyStorageRevisionListeners(storageRevision));
+ futures.addAll(notifyStorageRevisionListeners(storageRevision,
notificationNumber));
if (futures.isEmpty()) {
return CompletableFuture.completedFuture(null);
@@ -329,19 +332,24 @@ public class ConfigurationRegistry implements
IgniteComponent {
/**
* Adds configuration storage revision change listener.
*
+ * <p>NOTE: If this method is called from another listener, then it is
guaranteed to be called starting from the next configuration
+ * update only.
+ *
* @param listener Listener.
*/
public void
listenUpdateStorageRevision(ConfigurationStorageRevisionListener listener) {
- storageRevisionListeners.add(listener);
+ storageRevisionListeners.addListener(listener,
changer.notificationCount());
}
/**
* Removes configuration storage revision change listener.
*
+ * <p>NOTE: Unpredictable behavior if the method is called inside other
listeners.
+ *
* @param listener Listener.
*/
public void
stopListenUpdateStorageRevision(ConfigurationStorageRevisionListener listener) {
- storageRevisionListeners.remove(listener);
+ storageRevisionListeners.removeListener(listener);
}
/**
@@ -464,14 +472,17 @@ public class ConfigurationRegistry implements
IgniteComponent {
}
}
- private Collection<CompletableFuture<?>>
notifyStorageRevisionListeners(long storageRevision) {
- if (storageRevisionListeners.isEmpty()) {
- return List.of();
- }
+ private Collection<CompletableFuture<?>>
notifyStorageRevisionListeners(long storageRevision, long notificationNumber) {
+ // Lazy init.
+ List<CompletableFuture<?>> futures = null;
+
+ for (Iterator<ConfigurationStorageRevisionListener> it =
storageRevisionListeners.listeners(notificationNumber); it.hasNext(); ) {
+ if (futures == null) {
+ futures = new ArrayList<>();
+ }
- List<CompletableFuture<?>> futures = new
ArrayList<>(storageRevisionListeners.size());
+ ConfigurationStorageRevisionListener listener = it.next();
- for (ConfigurationStorageRevisionListener listener :
storageRevisionListeners) {
try {
CompletableFuture<?> future =
listener.onUpdate(storageRevision);
@@ -485,6 +496,16 @@ public class ConfigurationRegistry implements
IgniteComponent {
}
}
- return futures;
+ return futures == null ? List.of() : futures;
+ }
+
+ /**
+ * Returns the count of configuration listener notifications.
+ *
+ * <p>Monotonically increasing value that should be incremented each time
an attempt is made to notify all listeners of the
+ * configuration. Allows to guarantee that new listeners will be called
only on the next notification of all configuration listeners.
+ */
+ public long notificationCount() {
+ return changer.notificationCount();
}
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicConfigurationChanger.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicConfigurationChanger.java
index 447f308..d4756f7 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicConfigurationChanger.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/DynamicConfigurationChanger.java
@@ -42,7 +42,6 @@ public interface DynamicConfigurationChanger {
* Get root node by root key.
*
* @param rootKey Root key.
- * @return Root node.
*/
InnerNode getRootNode(RootKey<?, ?> rootKey);
@@ -54,4 +53,12 @@ public interface DynamicConfigurationChanger {
* @throws NoSuchElementException If no value could be found.
*/
<T> T getLatest(List<KeyPathNode> path);
+
+ /**
+ * Returns the count of configuration listener notifications.
+ *
+ * <p>Monotonically increasing value that should be incremented each time
an attempt is made to notify all listeners of the
+ * configuration. Allows to guarantee that new listeners will be called
only on the next notification of all configuration listeners.
+ */
+ long notificationCount();
}
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/NamedListConfiguration.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/NamedListConfiguration.java
index 520c165..5a732e1 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/NamedListConfiguration.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/NamedListConfiguration.java
@@ -18,11 +18,11 @@
package org.apache.ignite.internal.configuration;
import java.util.Collections;
+import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.concurrent.CopyOnWriteArrayList;
import java.util.function.BiFunction;
import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.NamedConfigurationTree;
@@ -43,7 +43,8 @@ public class NamedListConfiguration<T extends
ConfigurationProperty<VIEWT>, VIEW
extends DynamicConfiguration<NamedListView<VIEWT>,
NamedListChange<VIEWT, CHANGET>>
implements NamedConfigurationTree<T, VIEWT, CHANGET> {
/** Listeners of property update. */
- private final List<ConfigurationNamedListListener<VIEWT>>
extendedListeners = new CopyOnWriteArrayList<>();
+ private final
ConfigurationListenerHolder<ConfigurationNamedListListener<VIEWT>>
extendedListeners =
+ new ConfigurationListenerHolder<>();
/** Creator of named configuration. */
private final BiFunction<List<String>, String, T> cfgCreator;
@@ -155,25 +156,27 @@ public class NamedListConfiguration<T extends
ConfigurationProperty<VIEWT>, VIEW
return Collections.unmodifiableMap(res);
}
- /**
- * Returns list of listeners that are specific for named configurations.
- *
- * @return List of listeners that are specific for named configurations.
- */
- public List<ConfigurationNamedListListener<VIEWT>> extendedListeners() {
- return Collections.unmodifiableList(extendedListeners);
- }
-
/** {@inheritDoc} */
@Override
public void listenElements(ConfigurationNamedListListener<VIEWT> listener)
{
- extendedListeners.add(listener);
+ extendedListeners.addListener(listener, changer.notificationCount());
}
/** {@inheritDoc} */
@Override
public void stopListenElements(ConfigurationNamedListListener<VIEWT>
listener) {
- extendedListeners.remove(listener);
+ extendedListeners.removeListener(listener);
+ }
+
+ /**
+ * Returns an iterator of the listeners for the {@code notificationNumber}
(were added for and before it).
+ *
+ * <p>NOTE: {@link Iterator#remove} - not supported.
+ *
+ * @param notificationNumber Configuration notification listener number.
+ */
+ public Iterator<ConfigurationNamedListListener<VIEWT>>
extendedListeners(long notificationNumber) {
+ return extendedListeners.listeners(notificationNumber);
}
/** {@inheritDoc} */
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationContext.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationContext.java
index 3aafef1..f7a73ff 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationContext.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationContext.java
@@ -29,7 +29,7 @@ import org.jetbrains.annotations.Nullable;
* Context to notify configuration listeners.
*/
class ConfigurationNotificationContext {
- /** Storage revision. */
+ /** Current configuration storage revision. */
private final long storageRevision;
/** The tail of containers, implements a stack for safe traversal in
{@link ConfigurationNotificationEventImpl events}. */
@@ -39,13 +39,18 @@ class ConfigurationNotificationContext {
/** For collect configuration listener futures. */
final Collection<CompletableFuture<?>> futures = new ArrayList<>();
+ /** Current configuration listener notification number. */
+ final long notificationNum;
+
/**
* Constructor.
*
* @param storageRevision Storage revision.
+ * @param notificationNum Current configuration listener notification
number.
*/
- ConfigurationNotificationContext(long storageRevision) {
+ ConfigurationNotificationContext(long storageRevision, long
notificationNum) {
this.storageRevision = storageRevision;
+ this.notificationNum = notificationNum;
}
/**
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationUtils.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationUtils.java
index db3306b..a724624 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationUtils.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotificationUtils.java
@@ -17,9 +17,11 @@
package org.apache.ignite.internal.configuration.notifications;
+import static java.util.Collections.emptyIterator;
+
import java.io.Serializable;
import java.util.Collection;
-import java.util.List;
+import java.util.Iterator;
import java.util.Objects;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -40,19 +42,17 @@ class ConfigurationNotificationUtils {
* Private constructor.
*/
private ConfigurationNotificationUtils() {
+ // No op.
}
/**
* Returns the dynamic property of the leaf.
*
* @param dynamicConfig Dynamic configuration.
- * @param nodeName Name of the child node.
+ * @param nodeName Name of the child node.
* @return Dynamic property of a leaf or {@code null} if the leaf does not
exist.
*/
- static @Nullable DynamicProperty<Serializable> dynamicProperty(
- DynamicConfiguration<InnerNode, ?> dynamicConfig,
- String nodeName
- ) {
+ static @Nullable DynamicProperty<Serializable>
dynamicProperty(DynamicConfiguration<InnerNode, ?> dynamicConfig, String
nodeName) {
return (DynamicProperty<Serializable>)
dynamicConfig.members().get(nodeName);
}
@@ -60,13 +60,10 @@ class ConfigurationNotificationUtils {
* Returns the dynamic configuration of the child node.
*
* @param dynamicConfig Dynamic configuration.
- * @param nodeName Name of the child node.
+ * @param nodeName Name of the child node.
* @return Dynamic configuration of the child node or {@code null} if the
child node does not exist.
*/
- static @Nullable DynamicConfiguration<InnerNode, ?> dynamicConfig(
- DynamicConfiguration<InnerNode, ?> dynamicConfig,
- String nodeName
- ) {
+ static @Nullable DynamicConfiguration<InnerNode, ?>
dynamicConfig(DynamicConfiguration<InnerNode, ?> dynamicConfig, String
nodeName) {
return (DynamicConfiguration<InnerNode, ?>)
dynamicConfig.members().get(nodeName);
}
@@ -74,7 +71,7 @@ class ConfigurationNotificationUtils {
* Returns the named dynamic configuration of the child node.
*
* @param dynamicConfig Dynamic configuration.
- * @param nodeName Name of the child node.
+ * @param nodeName Name of the child node.
* @return Named dynamic configuration of the child node or {@code null}
if the child node does not exist.
*/
static @Nullable NamedListConfiguration<?, InnerNode, ?>
namedDynamicConfig(
@@ -85,25 +82,26 @@ class ConfigurationNotificationUtils {
}
/**
- * Null-safe version of {@link ConfigurationNode#listeners()}.
- *
- * <p>Needed for working with "any" configuration properties, see this
class' javadoc for details.
+ * Null-safe version of {@link ConfigurationNode#listeners(long)}.
*
- * @return Listeners of the given node or an empty list if it is {@code
null}.
+ * @param node Configuration tree node.
+ * @param notificationNumber Configuration notification listener number.
*/
- static <T> Collection<ConfigurationListener<T>> listeners(@Nullable
ConfigurationNode<T> node) {
- return node == null ? List.of() : node.listeners();
+ static <T> Iterator<ConfigurationListener<T>> listeners(@Nullable
ConfigurationNode<T> node, long notificationNumber) {
+ return node == null ? emptyIterator() :
node.listeners(notificationNumber);
}
/**
- * Null-safe version of {@link NamedListConfiguration#extendedListeners()}.
+ * Null-safe version of {@link
NamedListConfiguration#extendedListeners(long)}.
*
- * <p>Needed for working with "any" configuration properties, see this
class' javadoc for details.
- *
- * @return Listeners of the given node or an empty list if it is {@code
null}.
+ * @param node Named list configuration.
+ * @param notificationNumber Configuration notification listener number.
*/
- static <T> Collection<ConfigurationNamedListListener<T>>
extendedListeners(@Nullable NamedListConfiguration<?, T, ?> node) {
- return node == null ? List.of() : node.extendedListeners();
+ static <T> Iterator<ConfigurationNamedListListener<T>> extendedListeners(
+ @Nullable NamedListConfiguration<?, T, ?> node,
+ long notificationNumber
+ ) {
+ return node == null ? emptyIterator() :
node.extendedListeners(notificationNumber);
}
/**
diff --git
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
index 4be6808..e0791c9 100644
---
a/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
+++
b/modules/configuration/src/main/java/org/apache/ignite/internal/configuration/notifications/ConfigurationNotifier.java
@@ -33,6 +33,7 @@ import static
org.apache.ignite.internal.util.CollectionUtils.viewReadOnly;
import java.io.Serializable;
import java.util.Collection;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
@@ -41,7 +42,6 @@ import org.apache.ignite.configuration.ConfigurationProperty;
import org.apache.ignite.configuration.notifications.ConfigurationListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNamedListListener;
import
org.apache.ignite.configuration.notifications.ConfigurationNotificationEvent;
-import org.apache.ignite.internal.configuration.ConfigurationNode;
import org.apache.ignite.internal.configuration.DynamicConfiguration;
import org.apache.ignite.internal.configuration.NamedListConfiguration;
import org.apache.ignite.internal.configuration.tree.ConfigurationVisitor;
@@ -64,6 +64,7 @@ public class ConfigurationNotifier {
* @param newInnerNode New configuration values root.
* @param config Public configuration tree node corresponding to the
current inner nodes.
* @param storageRevision Storage revision.
+ * @param notificationNumber Notification number.
* @return Collected configuration listener futures.
* @see ConfigurationListener
* @see ConfigurationNamedListListener
@@ -72,13 +73,14 @@ public class ConfigurationNotifier {
@Nullable InnerNode oldInnerNode,
InnerNode newInnerNode,
DynamicConfiguration<InnerNode, ?> config,
- long storageRevision
+ long storageRevision,
+ long notificationNumber
) {
if (oldInnerNode == newInnerNode) {
return List.of();
}
- ConfigurationNotificationContext notificationCtx = new
ConfigurationNotificationContext(storageRevision);
+ ConfigurationNotificationContext notificationCtx = new
ConfigurationNotificationContext(storageRevision, notificationNumber);
notificationCtx.addContainer(config, null);
@@ -98,7 +100,7 @@ public class ConfigurationNotifier {
InnerNode newInnerNode,
DynamicConfiguration<InnerNode, ?> config,
Collection<DynamicConfiguration<InnerNode, ?>> anyConfigs,
- ConfigurationNotificationContext notificationCtx
+ ConfigurationNotificationContext ctx
) {
assert !(config instanceof NamedListConfiguration);
@@ -107,11 +109,11 @@ public class ConfigurationNotifier {
}
notifyPublicListeners(
- config.listeners(),
- viewReadOnly(anyConfigs, ConfigurationNode::listeners),
+ listeners(config, ctx.notificationNum),
+ concat(viewReadOnly(anyConfigs, anyCfg -> listeners(anyCfg,
ctx.notificationNum))),
oldInnerNode.specificNode(),
newInnerNode.specificNode(),
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
@@ -131,11 +133,11 @@ public class ConfigurationNotifier {
if (newLeaf != oldLeaf) {
notifyPublicListeners(
- listeners(dynamicProperty(config, key)),
- viewReadOnly(anyConfigs, anyConfig ->
listeners(dynamicProperty(anyConfig, key))),
+ listeners(dynamicProperty(config, key),
ctx.notificationNum),
+ concat(viewReadOnly(anyConfigs, anyCfg ->
listeners(dynamicProperty(anyCfg, key), ctx.notificationNum))),
oldLeaf,
newLeaf,
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
}
@@ -150,17 +152,17 @@ public class ConfigurationNotifier {
DynamicConfiguration<InnerNode, ?> newConfig =
dynamicConfig(config, key);
- notificationCtx.addContainer(newConfig, null);
+ ctx.addContainer(newConfig, null);
notifyListeners(
oldNode,
newNode,
newConfig,
- viewReadOnly(anyConfigs, anyConfig ->
dynamicConfig(anyConfig, key)),
- notificationCtx
+ viewReadOnly(anyConfigs, anyCfg ->
dynamicConfig(anyCfg, key)),
+ ctx
);
- notificationCtx.removeContainer(newConfig);
+ ctx.removeContainer(newConfig);
return null;
}
@@ -173,11 +175,11 @@ public class ConfigurationNotifier {
if (newNamedList != oldNamedList) {
notifyPublicListeners(
- listeners(namedDynamicConfig(config, key)),
- viewReadOnly(anyConfigs, anyConfig ->
listeners(namedDynamicConfig(anyConfig, key))),
+ listeners(namedDynamicConfig(config, key),
ctx.notificationNum),
+ concat(viewReadOnly(anyConfigs, anyCfg ->
listeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum))),
oldNamedList,
newNamedList,
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
@@ -196,22 +198,25 @@ public class ConfigurationNotifier {
touch(newNodeCfg);
- notificationCtx.addContainer(newNodeCfg, name);
+ ctx.addContainer(newNodeCfg, name);
InnerNode newVal = newNamedList.getInnerNode(name);
notifyPublicListeners(
- extendedListeners(namedDynamicConfig(config,
key)),
- viewReadOnly(anyConfigs, anyConfig ->
extendedListeners(namedDynamicConfig(anyConfig, key))),
+ extendedListeners(namedDynamicConfig(config,
key), ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
extendedListeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum)
+ )),
null,
newVal.specificNode(),
- notificationCtx,
+ ctx,
ConfigurationNamedListListener::onCreate
);
if (newAnyConfigs == null) {
newAnyConfigs = mergeAnyConfigs(
- viewReadOnly(anyConfigs, anyConfig ->
any(namedDynamicConfig(anyConfig, key))),
+ viewReadOnly(anyConfigs, anyCfg ->
any(namedDynamicConfig(anyCfg, key))),
any(namedListCfg)
);
}
@@ -220,10 +225,10 @@ public class ConfigurationNotifier {
newVal,
newNodeCfg,
newAnyConfigs,
- notificationCtx
+ ctx
);
- notificationCtx.removeContainer(newNodeCfg);
+ ctx.removeContainer(newNodeCfg);
}
for (String name : namedListChanges.deleted) {
@@ -232,52 +237,61 @@ public class ConfigurationNotifier {
delNodeCfg.removedFromNamedList();
- notificationCtx.addContainer(delNodeCfg, name);
+ ctx.addContainer(delNodeCfg, name);
InnerNode oldVal = oldNamedList.getInnerNode(name);
notifyPublicListeners(
- extendedListeners(namedDynamicConfig(config,
key)),
- viewReadOnly(anyConfigs, anyConfig ->
extendedListeners(namedDynamicConfig(anyConfig, key))),
+ extendedListeners(namedDynamicConfig(config,
key), ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
extendedListeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum)
+ )),
oldVal.specificNode(),
null,
- notificationCtx,
+ ctx,
ConfigurationNamedListListener::onDelete
);
// Notification for deleted configuration.
notifyPublicListeners(
- listeners(delNodeCfg),
- viewReadOnly(anyConfigs, anyConfig ->
listeners(any(namedDynamicConfig(anyConfig, key)))),
+ listeners(delNodeCfg, ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
listeners(any(namedDynamicConfig(anyCfg, key)), ctx.notificationNum)
+ )),
oldVal.specificNode(),
null,
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
- notificationCtx.removeContainer(delNodeCfg);
+ ctx.removeContainer(delNodeCfg);
}
for (Map.Entry<String, String> entry :
namedListChanges.renamed.entrySet()) {
DynamicConfiguration<InnerNode, ?> renNodeCfg =
(DynamicConfiguration<InnerNode, ?>)
namedListCfg.members().get(entry.getValue());
- notificationCtx.addContainer(renNodeCfg,
entry.getValue());
+ ctx.addContainer(renNodeCfg, entry.getValue());
InnerNode oldVal =
oldNamedList.getInnerNode(entry.getKey());
InnerNode newVal =
newNamedList.getInnerNode(entry.getValue());
notifyPublicListeners(
- extendedListeners(namedDynamicConfig(config,
key)),
- viewReadOnly(anyConfigs, anyConfig ->
extendedListeners(namedDynamicConfig(anyConfig, key))),
+ extendedListeners(namedDynamicConfig(config,
key), ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
extendedListeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum)
+ )),
oldVal.specificNode(),
newVal.specificNode(),
- notificationCtx,
+ ctx,
(listener, event) ->
listener.onRename(entry.getKey(), entry.getValue(), event)
);
- notificationCtx.removeContainer(renNodeCfg);
+ ctx.removeContainer(renNodeCfg);
}
for (String name : namedListChanges.updated) {
@@ -291,20 +305,23 @@ public class ConfigurationNotifier {
DynamicConfiguration<InnerNode, ?> updNodeCfg =
(DynamicConfiguration<InnerNode, ?>)
namedListCfgMembers.get(name);
- notificationCtx.addContainer(updNodeCfg, name);
+ ctx.addContainer(updNodeCfg, name);
notifyPublicListeners(
- extendedListeners(namedDynamicConfig(config,
key)),
- viewReadOnly(anyConfigs, anyConfig ->
extendedListeners(namedDynamicConfig(anyConfig, key))),
+ extendedListeners(namedDynamicConfig(config,
key), ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
extendedListeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum)
+ )),
oldVal.specificNode(),
newVal.specificNode(),
- notificationCtx,
+ ctx,
ConfigurationNamedListListener::onUpdate
);
if (newAnyConfigs == null) {
newAnyConfigs = mergeAnyConfigs(
- viewReadOnly(anyConfigs, anyConfig ->
any(namedDynamicConfig(anyConfig, key))),
+ viewReadOnly(anyConfigs, anyCfg ->
any(namedDynamicConfig(anyCfg, key))),
any(namedListCfg)
);
}
@@ -314,10 +331,10 @@ public class ConfigurationNotifier {
newVal,
updNodeCfg,
newAnyConfigs,
- notificationCtx
+ ctx
);
- notificationCtx.removeContainer(updNodeCfg);
+ ctx.removeContainer(updNodeCfg);
}
}
@@ -337,16 +354,16 @@ public class ConfigurationNotifier {
InnerNode innerNode,
DynamicConfiguration<InnerNode, ?> config,
Collection<DynamicConfiguration<InnerNode, ?>> anyConfigs,
- ConfigurationNotificationContext notificationCtx
+ ConfigurationNotificationContext ctx
) {
assert !(config instanceof NamedListConfiguration);
notifyPublicListeners(
- config.listeners(),
- viewReadOnly(anyConfigs, ConfigurationNode::listeners),
+ listeners(config, ctx.notificationNum),
+ concat(viewReadOnly(anyConfigs, anyCfg -> listeners(anyCfg,
ctx.notificationNum))),
null,
innerNode.specificNode(),
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
@@ -355,11 +372,14 @@ public class ConfigurationNotifier {
@Override
public Void visitLeafNode(String key, Serializable leaf) {
notifyPublicListeners(
- listeners(dynamicProperty(config, key)),
- viewReadOnly(anyConfigs, anyConfig ->
listeners(dynamicProperty(anyConfig, key))),
+ listeners(dynamicProperty(config, key),
ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg -> listeners(dynamicProperty(anyCfg,
key), ctx.notificationNum)
+ )),
null,
leaf,
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
@@ -371,16 +391,16 @@ public class ConfigurationNotifier {
public Void visitInnerNode(String key, InnerNode nestedInnerNode) {
DynamicConfiguration<InnerNode, ?> nestedNodeConfig =
dynamicConfig(config, key);
- notificationCtx.addContainer(nestedNodeConfig, null);
+ ctx.addContainer(nestedNodeConfig, null);
notifyListeners(
nestedInnerNode,
nestedNodeConfig,
- viewReadOnly(anyConfigs, anyConfig ->
dynamicConfig(anyConfig, key)),
- notificationCtx
+ viewReadOnly(anyConfigs, anyCfg ->
dynamicConfig(anyCfg, key)),
+ ctx
);
- notificationCtx.removeContainer(nestedNodeConfig);
+ ctx.removeContainer(nestedNodeConfig);
return null;
}
@@ -389,11 +409,14 @@ public class ConfigurationNotifier {
@Override
public Void visitNamedListNode(String key, NamedListNode<?>
newNamedList) {
notifyPublicListeners(
- listeners(namedDynamicConfig(config, key)),
- viewReadOnly(anyConfigs, anyConfig ->
listeners(namedDynamicConfig(anyConfig, key))),
+ listeners(namedDynamicConfig(config, key),
ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg -> listeners(namedDynamicConfig(anyCfg,
key), ctx.notificationNum)
+ )),
null,
newNamedList,
- notificationCtx,
+ ctx,
ConfigurationListener::onUpdate
);
@@ -406,22 +429,25 @@ public class ConfigurationNotifier {
DynamicConfiguration<InnerNode, ?> namedNodeConfig =
(DynamicConfiguration<InnerNode, ?>)
namedDynamicConfig(config, key).getConfig(name);
- notificationCtx.addContainer(namedNodeConfig, name);
+ ctx.addContainer(namedNodeConfig, name);
InnerNode namedInnerNode = newNamedList.getInnerNode(name);
notifyPublicListeners(
- extendedListeners(namedDynamicConfig(config, key)),
- viewReadOnly(anyConfigs, anyConfig ->
extendedListeners(namedDynamicConfig(anyConfig, key))),
+ extendedListeners(namedDynamicConfig(config, key),
ctx.notificationNum),
+ concat(viewReadOnly(
+ anyConfigs,
+ anyCfg ->
extendedListeners(namedDynamicConfig(anyCfg, key), ctx.notificationNum)
+ )),
null,
namedInnerNode.specificNode(),
- notificationCtx,
+ ctx,
ConfigurationNamedListListener::onCreate
);
if (newAnyConfigs == null) {
newAnyConfigs = mergeAnyConfigs(
- viewReadOnly(anyConfigs, anyConfig ->
any(namedDynamicConfig(anyConfig, key))),
+ viewReadOnly(anyConfigs, anyCfg ->
any(namedDynamicConfig(anyCfg, key))),
any(namedDynamicConfig(config, key))
);
}
@@ -430,10 +456,10 @@ public class ConfigurationNotifier {
namedInnerNode,
namedNodeConfig,
newAnyConfigs,
- notificationCtx
+ ctx
);
- notificationCtx.removeContainer(namedNodeConfig);
+ ctx.removeContainer(namedNodeConfig);
}
return null;
@@ -442,8 +468,8 @@ public class ConfigurationNotifier {
}
private static <L extends ConfigurationListener<?>> void
notifyPublicListeners(
- Collection<? extends L> configListeners,
- Collection<? extends Collection<? extends L>> anyListeners,
+ Iterator<? extends L> configListeners,
+ Iterator<? extends L> anyListeners,
@Nullable Object oldValue,
@Nullable Object newValue,
ConfigurationNotificationContext notificationCtx,
@@ -452,25 +478,21 @@ public class ConfigurationNotifier {
// Lazy set.
ConfigurationNotificationEvent<?> event = null;
- for (Collection<? extends L> listeners : concat(anyListeners,
List.of(configListeners))) {
- if (!listeners.isEmpty()) {
- if (event == null) {
- event = notificationCtx.createEvent(oldValue, newValue);
- }
+ for (Iterator<? extends L> it = concat(anyListeners, configListeners);
it.hasNext(); ) {
+ if (event == null) {
+ event = notificationCtx.createEvent(oldValue, newValue);
+ }
- for (L listener : listeners) {
- try {
- CompletableFuture<?> future =
invokeListener.apply(listener, event);
+ try {
+ CompletableFuture<?> future = invokeListener.apply(it.next(),
event);
- assert future != null : invokeListener;
+ assert future != null : invokeListener;
- if (future.isCompletedExceptionally() ||
future.isCancelled() || !future.isDone()) {
- notificationCtx.futures.add(future);
- }
- } catch (Throwable t) {
-
notificationCtx.futures.add(CompletableFuture.failedFuture(t));
- }
+ if (future.isCompletedExceptionally() || future.isCancelled()
|| !future.isDone()) {
+ notificationCtx.futures.add(future);
}
+ } catch (Throwable t) {
+ notificationCtx.futures.add(CompletableFuture.failedFuture(t));
}
}
}
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolderTest.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolderTest.java
new file mode 100644
index 0000000..4605ca8
--- /dev/null
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/ConfigurationListenerHolderTest.java
@@ -0,0 +1,122 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.ignite.internal.configuration;
+
+import static java.util.stream.Collectors.toList;
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Spliterators;
+import java.util.stream.StreamSupport;
+import org.junit.jupiter.api.BeforeEach;
+import org.junit.jupiter.api.Test;
+
+/**
+ * Class to test the {@link ConfigurationListenerHolder}.
+ */
+public class ConfigurationListenerHolderTest {
+ private ConfigurationListenerHolder<? super Object> holder;
+
+ @BeforeEach
+ void beforeEach() {
+ holder = new ConfigurationListenerHolder<>();
+ }
+
+ @Test
+ void testAddListeners() {
+ holder.addListener(1, 0);
+ holder.addListener(2, 0);
+ holder.addListener(1, 0);
+ holder.addListener(3, 0);
+
+ assertEquals(
+ List.of(1, 2, 1, 3),
+ collect(holder.listeners(1))
+ );
+ }
+
+ @Test
+ void testRemoveListeners() {
+ holder.addListener(1, 0);
+ holder.addListener(2, 0);
+ holder.addListener(1, 0);
+
+ holder.removeListener(1);
+
+ assertEquals(
+ List.of(2, 1),
+ collect(holder.listeners(1))
+ );
+ }
+
+ @Test
+ void testListeners() {
+ assertTrue(collect(holder.listeners(-1)).isEmpty());
+ assertTrue(collect(holder.listeners(0)).isEmpty());
+ assertTrue(collect(holder.listeners(1)).isEmpty());
+
+ holder.addListener(1, 0);
+ holder.addListener(2, 0);
+ holder.addListener(1, 0);
+ holder.addListener(3, 0);
+
+ holder.addListener(4, 1);
+ holder.addListener(5, 1);
+
+ holder.addListener(7, 2);
+ holder.addListener(8, 2);
+
+ assertTrue(collect(holder.listeners(-1)).isEmpty());
+
+ assertEquals(
+ List.of(1, 2, 1, 3),
+ collect(holder.listeners(1))
+ );
+
+ assertEquals(
+ List.of(1, 2, 1, 3, 4, 5),
+ collect(holder.listeners(2))
+ );
+
+ assertEquals(
+ List.of(1, 2, 1, 3, 4, 5, 7, 8),
+ collect(holder.listeners(3))
+ );
+
+ assertEquals(
+ List.of(1, 2, 1, 3, 4, 5, 7, 8),
+ collect(holder.listeners(4))
+ );
+ }
+
+ @Test
+ void testClear() {
+ holder.addListener(1, 0);
+ holder.addListener(2, 0);
+
+ holder.clear();
+
+ assertTrue(collect(holder.listeners(1)).isEmpty());
+ }
+
+ private List<?> collect(Iterator<?> iterator) {
+ return
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0),
false).collect(toList());
+ }
+}
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/TestConfigurationChanger.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/TestConfigurationChanger.java
index 8260c4f..2c5add8 100644
---
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/TestConfigurationChanger.java
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/TestConfigurationChanger.java
@@ -61,7 +61,7 @@ public class TestConfigurationChanger extends
ConfigurationChanger {
Collection<Class<?>> polymorphicSchemaExtensions
) {
super(
- (oldRoot, newRoot, revision) -> completedFuture(null),
+ (oldRoot, newRoot, revision, notificationNumber) ->
completedFuture(null),
rootKeys,
validators,
storage
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
index a53e448..903080d 100644
---
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/notifications/ConfigurationListenerTest.java
@@ -35,6 +35,7 @@ import static
org.apache.ignite.internal.testframework.IgniteTestUtils.hasCause;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.hamcrest.Matchers.is;
import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertInstanceOf;
import static org.junit.jupiter.api.Assertions.assertNotEquals;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -1457,7 +1458,8 @@ public class ConfigurationListenerTest {
null,
(InnerNode) config.value(),
(DynamicConfiguration) config,
- 0
+ 0,
+ registry.notificationCount() + 1
);
for (CompletableFuture<?> fut : futs) {
@@ -1591,4 +1593,82 @@ public class ConfigurationListenerTest {
assertTrue(invokeListener.get());
}
+
+ @Test
+ void testIncreaseNotificationCount() throws Exception {
+ long notificationCount = registry.notificationCount();
+
+ assertTrue(notificationCount >= 0);
+
+ config.child().str().update(randomUuid()).get(1, SECONDS);
+
+ assertEquals(notificationCount + 1, registry.notificationCount());
+
+ registry.notifyCurrentConfigurationListeners().get(1, SECONDS);
+
+ assertEquals(notificationCount + 2, registry.notificationCount());
+ }
+
+ @Test
+ void testNotifyListenersOnNextUpdateConfiguration() throws Exception {
+ List<String> events = new ArrayList<>();
+
+ AtomicBoolean addListeners = new AtomicBoolean(true);
+
+ config.listen(configListener(ctx0 -> {
+ events.add("root");
+
+ if (addListeners.get()) {
+
registry.listenUpdateStorageRevision(configStorageRevisionListener(ctx ->
events.add("storageRevision")));
+
+ config.child().listen(configListener(ctx1 ->
events.add("child")));
+
+ config.child().str().listen(configListener(ctx1 ->
events.add("child.str")));
+
+ config.children().listen(configListener(ctx1 ->
events.add("children")));
+
+
config.children().listenElements(configNamedListenerOnCreate(ctx1 ->
events.add("children.onCreate")));
+
+
config.children().listenElements(configNamedListenerOnUpdate(ctx1 ->
events.add("children.onUpdate")));
+
+ config.children().get("0").listen(configListener(ctx1 ->
events.add("children.0")));
+
+ config.children().get("0").str().listen(configListener(ctx1 ->
events.add("children.0.str")));
+
+ config.children().any().listen(configListener(ctx1 ->
events.add("children.any")));
+
+ config.children().any().str().listen(configListener(ctx1 ->
events.add("children.any.str")));
+
+ addListeners.set(false);
+ }
+ }));
+
+ config.change(
+ c0 -> c0.changeChild(c1 ->
c1.changeStr(randomUuid())).changeChildren(c1 -> c1.create("0",
doNothingConsumer()))
+ ).get(1, SECONDS);
+
+ assertEquals(List.of("root"), events);
+
+ assertFalse(addListeners.get());
+
+ events.clear();
+
+ config.change(c0 -> c0.changeChild(c1 -> c1.changeStr(randomUuid()))
+ .changeChildren(c1 -> c1.create("1",
doNothingConsumer()).update("0", c2 -> c2.changeStr(randomUuid())))
+ ).get(1, SECONDS);
+
+ assertEquals(
+ List.of(
+ "root",
+ "child", "child.str",
+ "children",
+ "children.onCreate", "children.any",
"children.any.str",
+ "children.onUpdate", "children.any", "children.0",
"children.any.str", "children.0.str",
+ "storageRevision"
+ ),
+ events
+ );
+
+ assertFalse(addListeners.get());
+ }
}
diff --git
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
index 4bb8348..dd242b6 100644
---
a/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
+++
b/modules/configuration/src/test/java/org/apache/ignite/internal/configuration/testframework/ConfigurationExtension.java
@@ -35,7 +35,7 @@ import java.util.Collection;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
-import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;
import org.apache.ignite.configuration.RootKey;
@@ -199,7 +199,9 @@ public class ConfigurationExtension implements
BeforeEachCallback, AfterEachCall
var cfgRef = new AtomicReference<DynamicConfiguration<?, ?>>();
cfgRef.set(cgen.instantiateCfg(rootKey, new
DynamicConfigurationChanger() {
- private final AtomicInteger storageRev = new AtomicInteger();
+ private final AtomicLong storageRev = new AtomicLong();
+
+ private final AtomicLong notificationListenerCnt = new
AtomicLong();
/** {@inheritDoc} */
@Override
@@ -218,7 +220,8 @@ public class ConfigurationExtension implements
BeforeEachCallback, AfterEachCall
sr.getRoot(rootKey),
copy.getRoot(rootKey),
(DynamicConfiguration<InnerNode, ?>)
cfgRef.get(),
- storageRev.incrementAndGet()
+ storageRev.incrementAndGet(),
+ notificationListenerCnt.incrementAndGet()
);
return
CompletableFuture.allOf(futures.toArray(CompletableFuture[]::new));
@@ -239,6 +242,12 @@ public class ConfigurationExtension implements
BeforeEachCallback, AfterEachCall
public <T> T getLatest(List<KeyPathNode> path) {
return findEx(path, superRootRef.get());
}
+
+ /** {@inheritDoc} */
+ @Override
+ public long notificationCount() {
+ return notificationListenerCnt.get();
+ }
}));
touch(cfgRef.get());
diff --git
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
index 335d34c..e737609 100644
---
a/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
+++
b/modules/core/src/main/java/org/apache/ignite/internal/util/CollectionUtils.java
@@ -42,6 +42,11 @@ import org.jetbrains.annotations.Nullable;
* Utility class provides various method to work with collections.
*/
public final class CollectionUtils {
+ /** Stub. */
+ private CollectionUtils() {
+ // No op.
+ }
+
/**
* Tests if the given collection is either {@code null} or empty.
*
@@ -91,7 +96,7 @@ public final class CollectionUtils {
* Union set and items.
*
* @param set Set.
- * @param ts Items.
+ * @param ts Items.
* @param <T> Type of the elements of set and items..
* @return Immutable union of set and items.
*/
@@ -116,7 +121,7 @@ public final class CollectionUtils {
* Union collections.
*
* @param collections Collections.
- * @param <T> Type of the elements of collections.
+ * @param <T> Type of the elements of collections.
* @return Immutable union of collections.
*/
@SafeVarargs
@@ -159,7 +164,7 @@ public final class CollectionUtils {
* <p>NOTE: {@link Iterator#remove} - not supported.
*
* @param iterables Iterables.
- * @param <T> Type of the elements.
+ * @param <T> Type of the elements.
* @return Concatenation of iterables.
*/
@SafeVarargs
@@ -198,12 +203,99 @@ public final class CollectionUtils {
}
/**
+ * Create a lazy concatenation of iterators.
+ *
+ * <p>NOTE: {@link Iterator#remove} - not supported.
+ *
+ * @param iterators Iterators.
+ * @param <T> Type of the elements.
+ * @return Concatenation of iterators.
+ */
+ @SafeVarargs
+ public static <T> Iterator<T> concat(@Nullable Iterator<? extends T>...
iterators) {
+ if (iterators == null || iterators.length == 0) {
+ return emptyIterator();
+ }
+
+ return new Iterator<>() {
+ /** Current index at {@code iterators}. */
+ int idx = 0;
+
+ /** Current iterator. */
+ Iterator<? extends T> curr = emptyIterator();
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ while (!curr.hasNext() && idx < iterators.length) {
+ curr = iterators[idx++];
+ }
+
+ return curr.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ } else {
+ return curr.next();
+ }
+ }
+ };
+ }
+
+ /**
+ * Create a lazy concatenation of iterators.
+ *
+ * <p>NOTE: {@link Iterator#remove} - not supported.
+ *
+ * @param iterators Iterators.
+ * @param <T> Type of the elements.
+ * @return Concatenation of iterators.
+ */
+ public static <T> Iterator<T> concat(@Nullable Collection<Iterator<?
extends T>> iterators) {
+ if (iterators == null || iterators.isEmpty()) {
+ return emptyIterator();
+ }
+
+ return new Iterator<>() {
+ /** Super iterator. */
+ final Iterator<Iterator<? extends T>> it = iterators.iterator();
+
+ /** Current iterator. */
+ Iterator<? extends T> curr = emptyIterator();
+
+ /** {@inheritDoc} */
+ @Override
+ public boolean hasNext() {
+ while (!curr.hasNext() && it.hasNext()) {
+ curr = it.next();
+ }
+
+ return curr.hasNext();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public T next() {
+ if (!hasNext()) {
+ throw new NoSuchElementException();
+ } else {
+ return curr.next();
+ }
+ }
+ };
+ }
+
+ /**
* Create a collection view that can only be read.
*
* @param collection Basic collection.
- * @param mapper Conversion function.
- * @param <T1> Base type of the collection.
- * @param <T2> Type for view.
+ * @param mapper Conversion function.
+ * @param <T1> Base type of the collection.
+ * @param <T2> Type for view.
* @return Read-only collection view.
*/
public static <T1, T2> Collection<T2> viewReadOnly(
@@ -256,8 +348,8 @@ public final class CollectionUtils {
/**
* Difference of two sets.
*
- * @param a First set.
- * @param b Second set.
+ * @param a First set.
+ * @param b Second set.
* @param <T> Type of the elements.
* @return Immutable set of elements of the first without the second.
*/
@@ -284,11 +376,6 @@ public final class CollectionUtils {
return res == null ? Set.of() : unmodifiableSet(res);
}
- /** Stub. */
- private CollectionUtils() {
- // No op.
- }
-
/**
* Get unmodifiable copy set of specified collection.
*
diff --git
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
index 246a3f1..a0669a7 100644
---
a/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
+++
b/modules/core/src/test/java/org/apache/ignite/internal/util/CollectionUtilsTest.java
@@ -17,6 +17,7 @@
package org.apache.ignite.internal.util;
+import static java.util.Collections.emptyIterator;
import static java.util.function.Function.identity;
import static java.util.stream.Collectors.toList;
import static org.apache.ignite.internal.util.CollectionUtils.concat;
@@ -33,8 +34,10 @@ import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Set;
+import java.util.Spliterators;
import java.util.stream.StreamSupport;
import org.junit.jupiter.api.Test;
@@ -44,8 +47,8 @@ import org.junit.jupiter.api.Test;
public class CollectionUtilsTest {
@Test
void testConcatIterables() {
- assertTrue(collect(concat(null)).isEmpty());
- assertTrue(collect(concat(List.of())).isEmpty());
+ assertTrue(collect(concat((Iterable<?>[]) null)).isEmpty());
+ assertTrue(collect(concat((Iterable<?>) List.of())).isEmpty());
assertTrue(collect(concat(List.of(), List.of())).isEmpty());
assertEquals(List.of(1), collect(concat(List.of(1))));
@@ -111,7 +114,7 @@ public class CollectionUtilsTest {
@Test
void testCollectionUnion() {
assertTrue(union().isEmpty());
- assertTrue(union(new Collection[]{}).isEmpty());
+ assertTrue(union().isEmpty());
assertTrue(union(List.of()).isEmpty());
assertEquals(List.of(1), collect(union(List.of(1), List.of())));
@@ -121,17 +124,6 @@ public class CollectionUtilsTest {
}
/**
- * Collect of elements.
- *
- * @param iterable Iterable.
- * @param <T> Type of the elements.
- * @return Collected elements.
- */
- private <T> List<? extends T> collect(Iterable<? extends T> iterable) {
- return StreamSupport.stream(iterable.spliterator(),
false).collect(toList());
- }
-
- /**
* Test setOf by populated and empty list.
*/
@Test
@@ -159,6 +151,54 @@ public class CollectionUtilsTest {
testSetOf(Collections.emptySet());
}
+ @Test
+ void testConcatIterators() {
+ assertTrue(collect(concat((Iterator<?>[]) null)).isEmpty());
+ assertTrue(collect(concat(emptyIterator())).isEmpty());
+ assertTrue(collect(concat(emptyIterator(),
emptyIterator())).isEmpty());
+
+ assertEquals(List.of(1), collect(concat(List.of(1).iterator())));
+ assertEquals(List.of(1), collect(concat(List.of(1).iterator(),
emptyIterator())));
+ assertEquals(List.of(1), collect(concat(emptyIterator(),
List.of(1).iterator())));
+
+ assertEquals(List.of(1, 2, 3), collect(concat(List.of(1).iterator(),
List.of(2, 3).iterator())));
+ }
+
+ @Test
+ void testConcatCollectionOfIterators() {
+ assertTrue(collect(concat((Collection<Iterator<?>>) null)).isEmpty());
+ assertTrue(collect(concat(List.of())).isEmpty());
+ assertTrue(collect(concat(List.of(emptyIterator(),
emptyIterator()))).isEmpty());
+
+ assertEquals(List.of(1),
collect(concat(List.of(List.of(1).iterator()))));
+ assertEquals(List.of(1), collect(concat(List.of(List.of(1).iterator(),
emptyIterator()))));
+ assertEquals(List.of(1), collect(concat(List.of(emptyIterator(),
List.of(1).iterator()))));
+
+ assertEquals(List.of(1, 2, 3),
collect(concat(List.of(List.of(1).iterator(), List.of(2, 3).iterator()))));
+ }
+
+ /**
+ * Collect of elements.
+ *
+ * @param iterable Iterable.
+ * @param <T> Type of the elements.
+ * @return Collected elements.
+ */
+ private <T> List<? extends T> collect(Iterable<? extends T> iterable) {
+ return StreamSupport.stream(iterable.spliterator(),
false).collect(toList());
+ }
+
+ /**
+ * Collect of elements.
+ *
+ * @param iterator Iterator.
+ * @param <T> Type of the elements.
+ * @return Collected elements.
+ */
+ private <T> List<? extends T> collect(Iterator<? extends T> iterator) {
+ return
StreamSupport.stream(Spliterators.spliteratorUnknownSize(iterator, 0),
false).collect(toList());
+ }
+
private void testSetOf(Collection<Integer> data) {
Set<Integer> copy = setOf(data);