This is an automated email from the ASF dual-hosted git repository. toulmean pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/incubator-tuweni.git
The following commit(s) were added to refs/heads/main by this push: new 437434be Add a global expiration listener new eb0598c3 Merge pull request #459 from atoulme/add_expiration_listener 437434be is described below commit 437434bef32b42e51ffa1c8a11a6f4fd33063ca2 Author: Antoine Toulme <anto...@lunar-ocean.com> AuthorDate: Sun Jan 1 01:07:46 2023 -0800 Add a global expiration listener --- .../org/apache/tuweni/concurrent/ExpiringMap.java | 53 ++++++++++++++++------ .../org/apache/tuweni/concurrent/ExpiringSet.java | 29 +++++++++--- .../apache/tuweni/concurrent/ExpiringMapTest.java | 18 +++++++- .../apache/tuweni/concurrent/ExpiringSetTest.java | 12 ++++- 4 files changed, 88 insertions(+), 24 deletions(-) diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java index 8fc5c0d6..592efd47 100644 --- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java +++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringMap.java @@ -62,11 +62,14 @@ public final class ExpiringMap<K, V> implements Map<K, V> { private final LongSupplier currentTimeSupplier; private final Long defaultTimeout; + @Nullable + private final BiConsumer<K, V> globalExpiryListener; + /** * Construct an empty map. */ public ExpiringMap() { - this(System::currentTimeMillis, Long.MAX_VALUE); + this(System::currentTimeMillis, Long.MAX_VALUE, null); } /** @@ -75,12 +78,23 @@ public final class ExpiringMap<K, V> implements Map<K, V> { * @param defaultTimeout the default timeout in milliseconds */ public ExpiringMap(Long defaultTimeout) { - this(System::currentTimeMillis, defaultTimeout); + this(System::currentTimeMillis, defaultTimeout, null); } - ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout) { + /** + * Construct a map with a default timeout value and a global expiration listener. + * + * @param defaultTimeout the default timeout in milliseconds + * @param expiryListener a listener that will be called for each entry expiration + */ + public ExpiringMap(Long defaultTimeout, BiConsumer<K, V> expiryListener) { + this(System::currentTimeMillis, defaultTimeout, expiryListener); + } + + ExpiringMap(LongSupplier currentTimeSupplier, Long defaultTimeout, BiConsumer<K, V> expiryListener) { this.currentTimeSupplier = currentTimeSupplier; this.defaultTimeout = defaultTimeout; + this.globalExpiryListener = expiryListener; } @Nullable @@ -130,7 +144,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> { requireNonNull(key); requireNonNull(value); purgeExpired(); - ExpiringEntry<K, V> oldEntry = storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, null)); + ExpiringEntry<K, V> oldEntry = + storage.put(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener)); return (oldEntry == null) ? null : oldEntry.value; } @@ -146,7 +161,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> { */ @Nullable public V put(K key, V value, long expiry) { - return put(key, value, expiry, null); + return put(key, value, expiry, globalExpiryListener); } /** @@ -179,7 +194,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> { return previous; } - ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener); + ExpiringEntry<K, V> newEntry = + new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener); ExpiringEntry<K, V> oldEntry = storage.put(key, newEntry); expiryQueue.offer(newEntry); if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) { @@ -193,7 +209,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> { requireNonNull(m); purgeExpired(); for (Map.Entry<? extends K, ? extends V> e : m.entrySet()) { - storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, null)); + storage.put(e.getKey(), new ExpiringEntry<>(e.getKey(), e.getValue(), defaultTimeout, globalExpiryListener)); } } @@ -203,7 +219,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> { requireNonNull(key); requireNonNull(value); purgeExpired(); - ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, null)); + ExpiringEntry<K, V> oldEntry = + storage.putIfAbsent(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener)); return (oldEntry == null) ? null : oldEntry.value; } @@ -218,7 +235,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> { */ @Nullable public V putIfAbsent(K key, V value, long expiry) { - return putIfAbsent(key, value, expiry, null); + return putIfAbsent(key, value, expiry, globalExpiryListener); } /** @@ -250,7 +267,8 @@ public final class ExpiringMap<K, V> implements Map<K, V> { return previous; } - ExpiringEntry<K, V> newEntry = new ExpiringEntry<>(key, value, expiry, expiryListener); + ExpiringEntry<K, V> newEntry = + new ExpiringEntry<>(key, value, expiry, expiryListener == null ? globalExpiryListener : expiryListener); ExpiringEntry<K, V> oldEntry = storage.putIfAbsent(key, newEntry); if (oldEntry == null) { expiryQueue.offer(newEntry); @@ -292,7 +310,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> { expiryQueue.remove(oldEntry); } V newValue = remappingFunction.apply(k, oldEntry.value); - return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, null); + return (newValue == null) ? null : new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener); }); return (newEntry == null) ? null : newEntry.value; } @@ -305,14 +323,15 @@ public final class ExpiringMap<K, V> implements Map<K, V> { expiryQueue.remove(oldEntry); } V newValue = remappingFunction.apply(oldEntry.value, newEntry.value); - return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, null); + return (newValue == null) ? null : new ExpiringEntry<>(key, newValue, defaultTimeout, globalExpiryListener); }); return (entry == null) ? null : entry.value; } @Override public V replace(K key, V value) { - ExpiringEntry<K, V> oldEntry = storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, null)); + ExpiringEntry<K, V> oldEntry = + storage.replace(key, new ExpiringEntry<>(key, value, defaultTimeout, globalExpiryListener)); if (oldEntry != null) { if (oldEntry.expiry < Long.MAX_VALUE) { expiryQueue.remove(oldEntry); @@ -331,7 +350,7 @@ public final class ExpiringMap<K, V> implements Map<K, V> { if (oldEntry.expiry < Long.MAX_VALUE) { expiryQueue.remove(oldEntry); } - return new ExpiringEntry<>(k, newValue, defaultTimeout, null); + return new ExpiringEntry<>(k, newValue, defaultTimeout, globalExpiryListener); } return oldEntry; }); @@ -344,7 +363,11 @@ public final class ExpiringMap<K, V> implements Map<K, V> { if (oldEntry.expiry < Long.MAX_VALUE) { expiryQueue.remove(oldEntry); } - return new ExpiringEntry<>(k, requireNonNull(function.apply(k, oldEntry.value)), defaultTimeout, null); + return new ExpiringEntry<>( + k, + requireNonNull(function.apply(k, oldEntry.value)), + defaultTimeout, + globalExpiryListener); }); } diff --git a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java index 6974d240..97703e49 100644 --- a/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java +++ b/concurrent/src/main/java/org/apache/tuweni/concurrent/ExpiringSet.java @@ -34,6 +34,8 @@ import javax.annotation.Nullable; */ public final class ExpiringSet<E> implements Set<E> { + private final Consumer<E> globalExpiryListener; + // Uses object equality, to ensure uniqueness as a value in the storage map private static final class ExpiringEntry<E> implements Comparable<ExpiringEntry<E>> { private E element; @@ -64,22 +66,33 @@ public final class ExpiringSet<E> implements Set<E> { * @param evictionTimeout the default eviction timeout for entries in milliseconds. */ public ExpiringSet(long evictionTimeout) { - this(evictionTimeout, System::currentTimeMillis); + this(evictionTimeout, System::currentTimeMillis, null); + } + + /** + * Construct an empty expiring set. + * + * @param evictionTimeout the default eviction timeout for entries in milliseconds. + * @param expiryListener a listener that will be called for each entry expiration + */ + public ExpiringSet(long evictionTimeout, Consumer<E> expiryListener) { + this(evictionTimeout, System::currentTimeMillis, expiryListener); } /** * Construct an empty set. */ public ExpiringSet() { - this(Long.MAX_VALUE, System::currentTimeMillis); + this(Long.MAX_VALUE, System::currentTimeMillis, null); } - ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier) { + ExpiringSet(long evictionTimeout, LongSupplier currentTimeSupplier, Consumer<E> expiryListener) { if (evictionTimeout <= 0) { throw new IllegalArgumentException("Invalid eviction timeout " + evictionTimeout); } this.evictionTimeout = evictionTimeout; this.currentTimeSupplier = currentTimeSupplier; + this.globalExpiryListener = expiryListener; } @Override @@ -135,7 +148,7 @@ public final class ExpiringSet<E> implements Set<E> { requireNonNull(e); purgeExpired(); ExpiringEntry<E> oldEntry = - storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, null)); + storage.put(e, new ExpiringEntry<>(e, currentTimeSupplier.getAsLong() + evictionTimeout, globalExpiryListener)); return oldEntry == null; } @@ -148,7 +161,7 @@ public final class ExpiringSet<E> implements Set<E> { * @return {@code true} if this set did not already contain the specified element. */ public boolean add(E element, long expiry) { - return add(element, expiry, null); + return add(element, expiry, globalExpiryListener); } /** @@ -174,7 +187,8 @@ public final class ExpiringSet<E> implements Set<E> { return removedPrevious; } - ExpiringEntry<E> newEntry = new ExpiringEntry<>(element, expiry, expiryListener); + ExpiringEntry<E> newEntry = + new ExpiringEntry<>(element, expiry, expiryListener == null ? globalExpiryListener : expiryListener); ExpiringEntry<E> oldEntry = storage.put(element, newEntry); expiryQueue.offer(newEntry); if (oldEntry != null && oldEntry.expiry < Long.MAX_VALUE) { @@ -189,7 +203,8 @@ public final class ExpiringSet<E> implements Set<E> { purgeExpired(); boolean noOldElements = true; for (E element : c) { - ExpiringEntry<E> oldEntry = storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, null)); + ExpiringEntry<E> oldEntry = + storage.put(element, new ExpiringEntry<>(element, Long.MAX_VALUE, globalExpiryListener)); if (oldEntry != null) { noOldElements = false; } diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java index df981974..afca86d8 100644 --- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java +++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringMapTest.java @@ -20,6 +20,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import java.time.Instant; import java.util.Collections; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -32,7 +33,7 @@ class ExpiringMapTest { @BeforeEach void setup() { currentTime = Instant.now(); - map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE); + map = new ExpiringMap<>(() -> currentTime.toEpochMilli(), Long.MAX_VALUE, null); } @Test @@ -75,6 +76,21 @@ class ExpiringMapTest { assertNull(map.remove(1)); } + @Test + void addGlobalExpiryListener() { + AtomicReference<String> key = new AtomicReference<>(); + AtomicReference<String> value = new AtomicReference<>(); + ExpiringMap<String, String> listeningMap = + new ExpiringMap<>(() -> currentTime.toEpochMilli(), 1L, (String k, String v) -> { + key.set(k); + value.set(v); + }); + listeningMap.put("foo", "bar", -1); + assertEquals("foo", key.get()); + assertEquals("bar", value.get()); + assertEquals(0, listeningMap.size()); + } + @Test void itemIsExpiredAfterExpiry() { Instant futureTime = currentTime.plusSeconds(10); diff --git a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java index 541367bd..c6a47066 100644 --- a/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java +++ b/concurrent/src/test/java/org/apache/tuweni/concurrent/ExpiringSetTest.java @@ -17,6 +17,7 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import java.time.Instant; import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicReference; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; @@ -29,7 +30,7 @@ class ExpiringSetTest { @BeforeEach void setup() { currentTime = Instant.now(); - set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli()); + set = new ExpiringSet<>(Long.MAX_VALUE, () -> currentTime.toEpochMilli(), null); } @Test @@ -62,6 +63,15 @@ class ExpiringSetTest { assertFalse(set.remove("foo")); } + @Test + void addGlobalExpiryListener() { + AtomicReference<String> key = new AtomicReference<>(); + ExpiringSet<String> listeningSet = new ExpiringSet<>(1L, () -> currentTime.toEpochMilli(), key::set); + listeningSet.add("foo", -1); + assertEquals("foo", key.get()); + assertEquals(0, listeningSet.size()); + } + @Test void itemIsMissingAfterExpiry() { Instant futureTime = currentTime.plusSeconds(10); --------------------------------------------------------------------- To unsubscribe, e-mail: commits-unsubscr...@tuweni.apache.org For additional commands, e-mail: commits-h...@tuweni.apache.org