This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git
The following commit(s) were added to refs/heads/main by this push:
new ef00073439 ddata: Add wildcard prefix subscriptions to Replicator
(#2735)
ef00073439 is described below
commit ef00073439e2b48a569c4520fa03835fda9cfb48
Author: PJ Fanning <[email protected]>
AuthorDate: Wed Mar 18 10:14:53 2026 +0100
ddata: Add wildcard prefix subscriptions to Replicator (#2735)
* Initial plan
* ddata: Add wildcard prefix subscriptions to Replicator (port of
akka/akka-core#31731)
Co-authored-by: pjfanning <[email protected]>
* Port missing wildcard subscription changes from Akka PR #31731 (#11)
* Initial plan
* Add wildcard subscription improvements: override withId, serialization,
docs, and tests
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
* Fix post-rebase issues: sendExpiredIfMissing, expiryWildcards, isExpired
Duration.Zero bug
Co-authored-by: pjfanning <[email protected]>
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.../org/apache/pekko/cluster/ddata/GSet.scala | 5 +-
.../org/apache/pekko/cluster/ddata/Flag.scala | 5 +-
.../org/apache/pekko/cluster/ddata/GCounter.scala | 5 +-
.../scala/org/apache/pekko/cluster/ddata/Key.scala | 7 +
.../org/apache/pekko/cluster/ddata/LWWMap.scala | 5 +-
.../apache/pekko/cluster/ddata/LWWRegister.scala | 5 +-
.../org/apache/pekko/cluster/ddata/ORMap.scala | 5 +-
.../apache/pekko/cluster/ddata/ORMultiMap.scala | 5 +-
.../org/apache/pekko/cluster/ddata/ORSet.scala | 5 +-
.../org/apache/pekko/cluster/ddata/PNCounter.scala | 5 +-
.../apache/pekko/cluster/ddata/PNCounterMap.scala | 5 +-
.../apache/pekko/cluster/ddata/Replicator.scala | 85 ++++++--
.../ddata/protobuf/ReplicatedDataSerializer.scala | 5 +-
.../cluster/ddata/WildcardSubscribeSpec.scala | 215 +++++++++++++++++++++
.../ddata/ReplicatorWildcardSubscriptionSpec.scala | 126 ++++++++++++
.../protobuf/ReplicatedDataSerializerSpec.scala | 14 ++
docs/src/main/paradox/typed/distributed-data.md | 4 +
17 files changed, 478 insertions(+), 28 deletions(-)
diff --git
a/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
b/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
index d1804b5ca5..24540c08c9 100644
---
a/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
+++
b/distributed-data/src/main/scala-2/org/apache/pekko/cluster/ddata/GSet.scala
@@ -104,4 +104,7 @@ object GSetKey {
}
@SerialVersionUID(1L)
-final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with
ReplicatedDataSerialization
+final case class GSetKey[A](_id: String) extends Key[GSet[A]](_id) with
ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): GSetKey[A] =
+ GSetKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
index 0d55a14195..95e2c31b7c 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Flag.scala
@@ -65,4 +65,7 @@ object FlagKey {
}
@SerialVersionUID(1L)
-final case class FlagKey(_id: String) extends Key[Flag](_id) with
ReplicatedDataSerialization
+final case class FlagKey(_id: String) extends Key[Flag](_id) with
ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): FlagKey =
+ FlagKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
index 7080d99e17..eb7d229b1f 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/GCounter.scala
@@ -167,4 +167,7 @@ object GCounterKey {
}
@SerialVersionUID(1L)
-final case class GCounterKey(_id: String) extends Key[GCounter](_id) with
ReplicatedDataSerialization
+final case class GCounterKey(_id: String) extends Key[GCounter](_id) with
ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): GCounterKey =
+ GCounterKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
index 1a3dca332d..7891b7aeab 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Key.scala
@@ -13,6 +13,8 @@
package org.apache.pekko.cluster.ddata
+import org.apache.pekko.cluster.ddata.Key.UnspecificKey
+
object Key {
/**
@@ -24,6 +26,8 @@ object Key {
type KeyId = String
+ final case class UnspecificKey(_id: KeyId) extends Key[ReplicatedData](_id)
with ReplicatedDataSerialization
+
}
/**
@@ -36,6 +40,9 @@ object Key {
*/
abstract class Key[+T <: ReplicatedData](val id: Key.KeyId) extends
Serializable {
+ def withId(newId: Key.KeyId): Key[ReplicatedData] =
+ UnspecificKey(newId)
+
override final def equals(o: Any): Boolean = o match {
case k: Key[_] => id == k.id
case _ => false
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
index 867977403e..8d029ec2f6 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWMap.scala
@@ -186,4 +186,7 @@ object LWWMapKey {
}
@SerialVersionUID(1L)
-final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id)
with ReplicatedDataSerialization
+final case class LWWMapKey[A, B](_id: String) extends Key[LWWMap[A, B]](_id)
with ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): LWWMapKey[A, B] =
+ LWWMapKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
index 0f70c4620a..9224591d4f 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/LWWRegister.scala
@@ -202,4 +202,7 @@ object LWWRegisterKey {
}
@SerialVersionUID(1L)
-final case class LWWRegisterKey[A](_id: String) extends
Key[LWWRegister[A]](_id) with ReplicatedDataSerialization
+final case class LWWRegisterKey[A](_id: String) extends
Key[LWWRegister[A]](_id) with ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): LWWRegisterKey[A] =
+ LWWRegisterKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
index 3e3cffadd8..1070ee3e63 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMap.scala
@@ -562,4 +562,7 @@ object ORMapKey {
@SerialVersionUID(1L)
final case class ORMapKey[A, B <: ReplicatedData](_id: String)
extends Key[ORMap[A, B]](_id)
- with ReplicatedDataSerialization
+ with ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): ORMapKey[A, B] =
+ ORMapKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
index 6a682181e8..a582d84d3e 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORMultiMap.scala
@@ -328,4 +328,7 @@ object ORMultiMapKey {
}
@SerialVersionUID(1L)
-final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A,
B]](_id) with ReplicatedDataSerialization
+final case class ORMultiMapKey[A, B](_id: String) extends Key[ORMultiMap[A,
B]](_id) with ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): ORMultiMapKey[A, B] =
+ ORMultiMapKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
index d618face23..59212fa6a0 100644
--- a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
+++ b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/ORSet.scala
@@ -574,4 +574,7 @@ object ORSetKey {
}
@SerialVersionUID(1L)
-final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with
ReplicatedDataSerialization
+final case class ORSetKey[A](_id: String) extends Key[ORSet[A]](_id) with
ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): ORSetKey[A] =
+ ORSetKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
index 36ff6495fb..8873db8e62 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounter.scala
@@ -217,4 +217,7 @@ object PNCounterKey {
}
@SerialVersionUID(1L)
-final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with
ReplicatedDataSerialization
+final case class PNCounterKey(_id: String) extends Key[PNCounter](_id) with
ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): PNCounterKey =
+ PNCounterKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
index f615081465..56d96bf0f9 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/PNCounterMap.scala
@@ -188,4 +188,7 @@ object PNCounterMapKey {
}
@SerialVersionUID(1L)
-final case class PNCounterMapKey[A](_id: String) extends
Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization
+final case class PNCounterMapKey[A](_id: String) extends
Key[PNCounterMap[A]](_id) with ReplicatedDataSerialization {
+ override def withId(newId: Key.KeyId): PNCounterMapKey[A] =
+ PNCounterMapKey(newId)
+}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index bb94ca7e8b..2721108a36 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -567,6 +567,10 @@ object Replicator {
* when the value of the given `key` is changed. Current value is also
* sent as a [[Changed]] message to a new subscriber.
*
+ * In addition to subscribing to individual keys it is possible to subscribe
to all keys with a given prefix
+ * by using a `*` at the end of the key `id`. For example
`GCounterKey("counter-*")`. Notifications will be
+ * sent for all matching keys, also new keys added later.
+ *
* Subscribers will be notified periodically with the configured
`notify-subscribers-interval`,
* and it is also possible to send an explicit `FlushChanges` message to
* the `Replicator` to notify the subscribers immediately.
@@ -1310,14 +1314,14 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
val serializer =
SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
- val expiryWildcards = settings.expiryKeys.collect { case (k, v) if
k.endsWith("*") => k.dropRight(1) -> v }
+ val expiryWildcards = settings.expiryKeys.collect { case (k, v) if
isWildcard(k) => dropWildcard(k) -> v }
val expiryEnabled: Boolean = settings.expiryKeys.nonEmpty
// updated on the gossip tick to avoid too many calls to
`currentTimeMillis()`
private var currentUsedTimestamp = if (expiryEnabled)
System.currentTimeMillis() else 0L
val hasDurableKeys = settings.durableKeys.nonEmpty
- val durable = settings.durableKeys.filterNot(_.endsWith("*"))
- val durableWildcards = settings.durableKeys.collect { case k if
k.endsWith("*") => k.dropRight(1) }
+ val durable = settings.durableKeys.filterNot(isWildcard)
+ val durableWildcards = settings.durableKeys.collect { case k if
isWildcard(k) => dropWildcard(k) }
val durableStore: ActorRef =
if (hasDurableKeys) {
val props = settings.durableStoreProps match {
@@ -1414,6 +1418,8 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
@nowarn("msg=deprecated")
val newSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]] with
mutable.MultiMap[KeyId, ActorRef]
var subscriptionKeys = Map.empty[KeyId, KeyR]
+ @nowarn("msg=deprecated")
+ val wildcardSubscribers = new mutable.HashMap[KeyId, mutable.Set[ActorRef]]
with mutable.MultiMap[KeyId, ActorRef]
// To be able to do efficient stashing we use this field instead of sender().
// Using internal buffer instead of Stash to avoid the overhead of the Stash
mailbox.
@@ -1883,7 +1889,7 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
val dig =
- if (subscribers.contains(key) && !changed.contains(key)) {
+ if (hasSubscriber(key) && !changed.contains(key)) {
val oldDigest = getDigest(key)
val (dig, payloadSize) = digest(newEnvelope)
payloadSizeAggregator.updatePayloadSize(key, payloadSize)
@@ -1963,7 +1969,12 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
def isExpired(key: KeyId, timestamp: Timestamp, now: Long): Boolean = {
- expiryEnabled && timestamp != 0L && timestamp <= now -
getExpiryDuration(key).toMillis
+ if (expiryEnabled && timestamp != 0L) {
+ val expiryDuration = getExpiryDuration(key)
+ expiryDuration != Duration.Zero && timestamp <= now -
expiryDuration.toMillis
+ } else {
+ false
+ }
}
def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit = {
@@ -1993,8 +2004,15 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
@nowarn("msg=deprecated")
def receiveFlushChanges(): Unit = {
- def notify(keyId: KeyId, subs: mutable.Set[ActorRef],
sendExpiredIfMissing: Boolean): Unit = {
- val key = subscriptionKeys(keyId)
+ def notify(keyId: KeyId, subs: Iterator[ActorRef], sendExpiredIfMissing:
Boolean): Unit = {
+ val key = subscriptionKeys.get(keyId) match {
+ case Some(r) => r
+ case None =>
+ subscriptionKeys
+ .collectFirst { case (k, r) if isWildcard(k) &&
keyId.startsWith(dropWildcard(k)) => r.withId(keyId) }
+ .getOrElse(throw new IllegalStateException(s"Subscription
notification of [$keyId], but no matching " +
+ s"subscription key in
[${subscriptionKeys.keysIterator.mkString(", ")}]"))
+ }
getData(keyId) match {
case Some(envelope) =>
val msg = if (envelope.data == DeletedData) Deleted(key) else
Changed(key)(envelope.data)
@@ -2009,17 +2027,25 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
}
- if (subscribers.nonEmpty) {
- for (key <- changed; if subscribers.contains(key); subs <-
subscribers.get(key))
- notify(key, subs, sendExpiredIfMissing = true)
+ if (subscribers.nonEmpty || wildcardSubscribers.nonEmpty) {
+ changed.foreach { key =>
+ if (hasSubscriber(key)) notify(key, getSubscribersIterator(key),
sendExpiredIfMissing = true)
+ }
}
// Changed event is sent to new subscribers even though the key has not
changed,
// i.e. send current value. Expired is not sent to new subscribers as the
first event.
if (newSubscribers.nonEmpty) {
for ((key, subs) <- newSubscribers) {
- notify(key, subs, sendExpiredIfMissing = false)
- subs.foreach { subscribers.addBinding(key, _) }
+ if (isWildcard(key)) {
+
dataEntries.keysIterator.filter(_.startsWith(dropWildcard(key))).foreach {
matchingKey =>
+ notify(matchingKey, subs.iterator, sendExpiredIfMissing = false)
+ }
+ subs.foreach { wildcardSubscribers.addBinding(dropWildcard(key), _) }
+ } else {
+ notify(key, subs.iterator, sendExpiredIfMissing = false)
+ subs.foreach { subscribers.addBinding(key, _) }
+ }
}
newSubscribers.clear()
}
@@ -2331,18 +2357,39 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
}
def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit = {
- subscribers.removeBinding(key.id, subscriber)
+ if (isWildcard(key.id))
wildcardSubscribers.removeBinding(dropWildcard(key.id), subscriber)
+ else subscribers.removeBinding(key.id, subscriber)
newSubscribers.removeBinding(key.id, subscriber)
if (!hasSubscriber(subscriber))
context.unwatch(subscriber)
- if (!subscribers.contains(key.id) && !newSubscribers.contains(key.id))
+ if (!hasSubscriber(key.id) && !newSubscribers.contains(key.id))
subscriptionKeys -= key.id
}
def hasSubscriber(subscriber: ActorRef): Boolean =
subscribers.exists { case (_, s) => s.contains(subscriber) } ||
+ wildcardSubscribers.exists { case (_, s) => s.contains(subscriber) } ||
newSubscribers.exists { case (_, s) => s.contains(subscriber) }
+ private def hasSubscriber(keyId: KeyId): Boolean =
+ subscribers.contains(keyId) ||
+ (wildcardSubscribers.nonEmpty && wildcardSubscribers.exists { case (k, _)
=> keyId.startsWith(k) })
+
+ private def getSubscribersIterator(keyId: KeyId): Iterator[ActorRef] = {
+ val subscribersIter =
subscribers.get(keyId).map(_.iterator).getOrElse(Iterator.empty)
+ if (wildcardSubscribers.isEmpty) subscribersIter
+ else
+ subscribersIter ++
+ wildcardSubscribers
+ .collectFirst { case (k, v) if keyId.startsWith(k) => v }
+ .map(_.iterator)
+ .getOrElse(Iterator.empty)
+ }
+
+ private def isWildcard(keyId: KeyId): Boolean = keyId.endsWith("*")
+
+ private def dropWildcard(keyId: KeyId): KeyId = keyId.dropRight(1)
+
def receiveTerminated(ref: ActorRef): Unit = {
if (ref == durableStore) {
log.error("Stopping distributed-data Replicator because durable store
terminated")
@@ -2352,13 +2399,17 @@ final class Replicator(settings: ReplicatorSettings)
extends Actor with ActorLog
keys1.foreach { key =>
subscribers.removeBinding(key, ref)
}
- val keys2 = newSubscribers.collect { case (k, s) if s.contains(ref) => k
}
+ val keys2 = wildcardSubscribers.collect { case (k, s) if s.contains(ref)
=> k }
keys2.foreach { key =>
+ wildcardSubscribers.removeBinding(key, ref)
+ }
+ val keys3 = newSubscribers.collect { case (k, s) if s.contains(ref) => k
}
+ keys3.foreach { key =>
newSubscribers.removeBinding(key, ref)
}
- (keys1 ++ keys2).foreach { key =>
- if (!subscribers.contains(key) && !newSubscribers.contains(key))
+ (keys1 ++ keys3).foreach { key =>
+ if (!hasSubscriber(key) && !newSubscribers.contains(key))
subscriptionKeys -= key
}
}
diff --git
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
index 37b33d6f31..6496dab09f 100644
---
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
+++
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializer.scala
@@ -304,6 +304,7 @@ class ReplicatedDataSerializer(val system:
ExtendedActorSystem)
private val ORMultiMapManifest = "K"
private val ORMultiMapKeyManifest = "k"
private val VersionVectorManifest = "L"
+ private val UnspecificKeyManifest = "m"
private val fromBinaryMap = collection.immutable.HashMap[String, Array[Byte]
=> AnyRef](
GSetManifest -> gsetFromBinary,
@@ -336,7 +337,8 @@ class ReplicatedDataSerializer(val system:
ExtendedActorSystem)
ORMapKeyManifest -> (bytes => ORMapKey(keyIdFromBinary(bytes))),
LWWMapKeyManifest -> (bytes => LWWMapKey(keyIdFromBinary(bytes))),
PNCounterMapKeyManifest -> (bytes =>
PNCounterMapKey(keyIdFromBinary(bytes))),
- ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))))
+ ORMultiMapKeyManifest -> (bytes => ORMultiMapKey(keyIdFromBinary(bytes))),
+ UnspecificKeyManifest -> (bytes =>
Key.UnspecificKey(keyIdFromBinary(bytes))))
override def manifest(obj: AnyRef): String = obj match {
case _: ORSet[_] => ORSetManifest
@@ -368,6 +370,7 @@ class ReplicatedDataSerializer(val system:
ExtendedActorSystem)
case _: LWWMapKey[_, _] => LWWMapKeyManifest
case _: PNCounterMapKey[_] => PNCounterMapKeyManifest
case _: ORMultiMapKey[_, _] => ORMultiMapKeyManifest
+ case _: Key.UnspecificKey => UnspecificKeyManifest
case _: ORSet.DeltaGroup[_] => ORSetDeltaGroupManifest
case _: ORMap.DeltaGroup[_, _] => ORMapDeltaGroupManifest
diff --git
a/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
new file mode 100644
index 0000000000..4bd29743d7
--- /dev/null
+++
b/distributed-data/src/multi-jvm/scala/org/apache/pekko/cluster/ddata/WildcardSubscribeSpec.scala
@@ -0,0 +1,215 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.cluster.ddata
+
+import scala.concurrent.duration._
+
+import com.typesafe.config.ConfigFactory
+
+import org.apache.pekko.cluster.Cluster
+import org.apache.pekko.remote.testconductor.RoleName
+import org.apache.pekko.remote.testkit.MultiNodeConfig
+import org.apache.pekko.remote.testkit.MultiNodeSpec
+import org.apache.pekko.testkit._
+
+object WildcardSubscribeSpec extends MultiNodeConfig {
+ val first = role("first")
+ val second = role("second")
+
+ commonConfig(ConfigFactory.parseString("""
+ pekko.loglevel = INFO
+ pekko.actor.provider = "cluster"
+ pekko.cluster.distributed-data {
+ gossip-interval = 500 millis
+ notify-subscribers-interval = 100 millis
+ expire-keys-after-inactivity {
+ "expiry-*" = 2 seconds
+ }
+ }
+ """))
+
+}
+
+class WildcardSubscribeSpecMultiJvmNode1 extends WildcardSubscribeSpec
+class WildcardSubscribeSpecMultiJvmNode2 extends WildcardSubscribeSpec
+
+class WildcardSubscribeSpec extends MultiNodeSpec(WildcardSubscribeSpec) with
STMultiNodeSpec with ImplicitSender {
+ import WildcardSubscribeSpec._
+ import Replicator._
+
+ override def initialParticipants: Int = roles.size
+
+ private val cluster = Cluster(system)
+ private implicit val selfUniqueAddress: SelfUniqueAddress =
DistributedData(system).selfUniqueAddress
+ private val replicator =
system.actorOf(Replicator.props(ReplicatorSettings(system)), "replicator")
+
+ private val KeyA = GCounterKey("counter-A")
+ private val KeyB = GCounterKey("counter-B")
+ private val KeyOtherA = GCounterKey("other-A")
+ private val KeyExpiryA = GCounterKey("expiry-A")
+
+ def join(from: RoleName, to: RoleName): Unit = {
+ runOn(from) {
+ cluster.join(node(to).address)
+ }
+ enterBarrier(from.name + "-joined")
+ }
+
+ "Replicator wildcard subscriptions" must {
+
+ "notify changed entry" in {
+ join(first, first)
+
+ runOn(first) {
+ val subscriberProbe = TestProbe()
+ replicator ! Subscribe(GCounterKey("counter-*"), subscriberProbe.ref)
+
+ replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ val chg1 = subscriberProbe.expectMsgType[Changed[GCounter]]
+ chg1.key should ===(KeyA)
+ chg1.key.getClass should ===(KeyA.getClass)
+ chg1.get(KeyA).value should ===(1)
+
+ replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ val chg2 = subscriberProbe.expectMsgType[Changed[GCounter]]
+ chg2.key should ===(KeyA)
+ chg2.get(KeyA).value should ===(2)
+
+ replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ val chg3 = subscriberProbe.expectMsgType[Changed[GCounter]]
+ chg3.key should ===(KeyB)
+ chg3.get(KeyB).value should ===(1)
+
+ replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 17)
+ expectMsgType[UpdateSuccess[_]]
+ subscriberProbe.expectNoMessage(200.millis)
+
+ // a few more subscribers
+ val subscriberProbe2 = TestProbe()
+ replicator ! Subscribe(GCounterKey("counter-*"), subscriberProbe2.ref)
+ val subscriberProbeA = TestProbe()
+ replicator ! Subscribe(KeyA, subscriberProbeA.ref)
+ val subscriberProbeOther = TestProbe()
+ replicator ! Subscribe(GCounterKey("other-*"),
subscriberProbeOther.ref)
+ subscriberProbe.expectNoMessage(200.millis)
+ subscriberProbe2.receiveN(2).foreach {
+ case chg: Changed[GCounter] @unchecked =>
+ if (chg.key == KeyA)
+ chg.get(KeyA).value should ===(2)
+ else if (chg.key == KeyB)
+ chg.get(KeyB).value should ===(1)
+ else
+ fail(s"unexpected change ${chg.key}")
+ case other =>
+ fail(s"Unexpected $other")
+ }
+ subscriberProbe2.expectNoMessage()
+ subscriberProbeA.expectMsgType[Changed[GCounter]].get(KeyA).value
should ===(2)
+
subscriberProbeOther.expectMsgType[Changed[GCounter]].get(KeyOtherA).value
should ===(17)
+
+ replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 10)
+ expectMsgType[UpdateSuccess[_]]
+ val chg4 = subscriberProbe.expectMsgType[Changed[GCounter]]
+ chg4.key should ===(KeyB)
+ chg4.get(KeyB).value should ===(11)
+ val chg5 = subscriberProbe2.expectMsgType[Changed[GCounter]]
+ chg5.key should ===(KeyB)
+ chg5.get(KeyB).value should ===(11)
+
+ // unsubscribe
+ replicator ! Unsubscribe(GCounterKey("counter-*"), subscriberProbe.ref)
+ replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 5)
+ expectMsgType[UpdateSuccess[_]]
+ subscriberProbe.expectNoMessage(200.millis)
+ val chg6 = subscriberProbe2.expectMsgType[Changed[GCounter]]
+ chg6.key should ===(KeyB)
+ chg6.get(KeyB).value should ===(16)
+ }
+
+ enterBarrier("done-1")
+ }
+
+ "notify expired entry" in {
+ runOn(first) {
+ val subscriberProbe = TestProbe()
+ replicator ! Subscribe(GCounterKey("expiry-*"), subscriberProbe.ref)
+
+ replicator ! Update(KeyExpiryA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ subscriberProbe.expectMsgType[Changed[GCounter]]
+
+ replicator ! Get(KeyExpiryA, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].get(KeyExpiryA).value should ===(1)
+
+ expectNoMessage(5.seconds)
+ replicator ! Get(KeyExpiryA, ReadLocal)
+ expectMsg(NotFound(KeyExpiryA, None))
+ subscriberProbe.expectMsg(Expired[GCounter](KeyExpiryA))
+
+ // same key can be used again
+ replicator ! Update(KeyExpiryA, GCounter.empty, WriteLocal)(_ :+ 2)
+ expectMsgType[UpdateSuccess[_]]
+ subscriberProbe.expectMsgType[Changed[GCounter]]
+ replicator ! Get(KeyExpiryA, ReadLocal)
+ expectMsgType[GetSuccess[GCounter]].get(KeyExpiryA).value should ===(2)
+ }
+
+ enterBarrier("done-2")
+ }
+
+ "notify when changed from another node" in {
+ runOn(first) {
+ replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ enterBarrier("updated-1")
+
+ enterBarrier("second-joined")
+
+ enterBarrier("received-1")
+
+ replicator ! Update(KeyOtherA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+ enterBarrier("updated-2")
+
+ enterBarrier("received-2")
+ }
+ runOn(second) {
+ enterBarrier("updated-1")
+
+ // it's possible to subscribe before join
+ val subscriberProbe = TestProbe()
+ replicator ! Subscribe(GCounterKey("other-*"), subscriberProbe.ref)
+
+ join(second, first)
+
+ val chg1 = subscriberProbe.expectMsgType[Changed[GCounter]](10.seconds)
+ chg1.key should ===(KeyOtherA)
+ chg1.get(KeyOtherA).value should ===(18) // it was also incremented to
17 in earlier test
+ enterBarrier("received-1")
+
+ enterBarrier("updated-2")
+
+ val chg2 = subscriberProbe.expectMsgType[Changed[GCounter]]
+ chg2.key should ===(KeyOtherA)
+ chg2.get(KeyOtherA).value should ===(19)
+ enterBarrier("received-2")
+ }
+ }
+
+ enterBarrier("done-3")
+ }
+}
diff --git
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
new file mode 100644
index 0000000000..5cc8d10993
--- /dev/null
+++
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorWildcardSubscriptionSpec.scala
@@ -0,0 +1,126 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+package org.apache.pekko.cluster.ddata
+
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.ActorSystem
+import pekko.cluster.ddata.Replicator._
+import pekko.testkit.ImplicitSender
+import pekko.testkit.TestKit
+import pekko.testkit.TestProbe
+
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.matchers.should.Matchers
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import com.typesafe.config.ConfigFactory
+
+object ReplicatorWildcardSubscriptionSpec {
+ val config = ConfigFactory.parseString("""
+ pekko.actor.provider = "cluster"
+ pekko.remote.classic.netty.tcp.port = 0
+ pekko.remote.artery.canonical.port = 0
+ pekko.remote.artery.canonical.hostname = 127.0.0.1
+ pekko.cluster.distributed-data.notify-subscribers-interval = 100ms
+ """)
+}
+
+class ReplicatorWildcardSubscriptionSpec(_system: ActorSystem)
+ extends TestKit(_system)
+ with AnyWordSpecLike
+ with Matchers
+ with BeforeAndAfterAll
+ with ImplicitSender {
+
+ def this() =
+ this(ActorSystem("ReplicatorWildcardSubscriptionSpec",
ReplicatorWildcardSubscriptionSpec.config))
+
+ override def afterAll(): Unit = shutdown(system)
+
+ implicit val selfUniqueAddress: SelfUniqueAddress =
DistributedData(system).selfUniqueAddress
+ val replicator = DistributedData(system).replicator
+
+ "Replicator wildcard subscriptions" must {
+
+ "notify subscriber for keys matching the wildcard prefix" in {
+ val KeyA1 = GCounterKey("notif-counter-a1")
+ val KeyA2 = GCounterKey("notif-counter-a2")
+ val KeyB = GCounterKey("notif-other-counter")
+ val WildcardKey = GCounterKey("notif-counter-*")
+
+ val probe = TestProbe()
+ replicator ! Subscribe(WildcardKey, probe.ref)
+
+ // Update a matching key
+ replicator ! Update(KeyA1, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+
+ val changed1 = probe.expectMsgType[Changed[GCounter]](5.seconds)
+ changed1.key.id should ===("notif-counter-a1")
+ changed1.get(changed1.key).value should ===(1)
+
+ // Update another matching key
+ replicator ! Update(KeyA2, GCounter.empty, WriteLocal)(_ :+ 2)
+ expectMsgType[UpdateSuccess[_]]
+
+ val changed2 = probe.expectMsgType[Changed[GCounter]](5.seconds)
+ changed2.key.id should ===("notif-counter-a2")
+ changed2.get(changed2.key).value should ===(2)
+
+ // Update a non-matching key - no notification expected
+ replicator ! Update(KeyB, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+
+ probe.expectNoMessage(500.millis)
+ }
+
+ "send current value to new wildcard subscriber for existing matching keys"
in {
+ val KeyA = GCounterKey("current-counter-a")
+ val WildcardKey = GCounterKey("current-counter-*")
+
+ replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 10)
+ expectMsgType[UpdateSuccess[_]]
+
+ // Subscribe after the key already exists
+ val subscribeProbe = TestProbe()
+ replicator ! Subscribe(WildcardKey, subscribeProbe.ref)
+
+ // Should receive current value for existing matching key
+ val changed = subscribeProbe.expectMsgType[Changed[GCounter]](5.seconds)
+ changed.key.id should ===("current-counter-a")
+ changed.get(changed.key).value should ===(10)
+ }
+
+ "unsubscribe wildcard subscriber" in {
+ val KeyA = GCounterKey("unsub-counter-a")
+ val WildcardKey = GCounterKey("unsub-counter-*")
+
+ replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 1)
+ expectMsgType[UpdateSuccess[_]]
+
+ val probe = TestProbe()
+ replicator ! Subscribe(WildcardKey, probe.ref)
+
+ // Receive the initial value
+ probe.expectMsgType[Changed[GCounter]](5.seconds)
+
+ // Unsubscribe
+ replicator ! Unsubscribe(WildcardKey, probe.ref)
+
+ // Update a matching key - no notification expected after unsubscribe
+ replicator ! Update(KeyA, GCounter.empty, WriteLocal)(_ :+ 100)
+ expectMsgType[UpdateSuccess[_]]
+
+ probe.expectNoMessage(500.millis)
+ }
+ }
+}
diff --git
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
index 481b199a0d..3c9043c8b0 100644
---
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
+++
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatedDataSerializerSpec.scala
@@ -96,6 +96,7 @@ class ReplicatedDataSerializerSpec
checkSameContent(GSet() + "a" + "b", GSet() + "b" + "a")
checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref2 + ref1 +
ref3)
checkSameContent(GSet() + ref1 + ref2 + ref3, GSet() + ref3 + ref2 +
ref1)
+ checkSerialization(GSetKey[String]("id"))
}
"serialize ORSet" in {
@@ -125,6 +126,7 @@ class ReplicatedDataSerializerSpec
val s5 = ORSet().add(address1, "a").add(address2, ref1)
val s6 = ORSet().add(address2, ref1).add(address1, "a")
checkSameContent(s5.merge(s6), s6.merge(s5))
+ checkSerialization(ORSetKey[String]("id"))
}
"serialize ORSet with ActorRef message sent between two systems" in {
@@ -187,6 +189,7 @@ class ReplicatedDataSerializerSpec
"serialize Flag" in {
checkSerialization(Flag())
checkSerialization(Flag().switchOn)
+ checkSerialization(FlagKey("id"))
}
"serialize LWWRegister" in {
@@ -194,6 +197,7 @@ class ReplicatedDataSerializerSpec
checkSerialization(
LWWRegister(address1, "value2", LWWRegister.defaultClock[String])
.withValue(address2, "value3", LWWRegister.defaultClock[String]))
+ checkSerialization(LWWRegisterKey[String]("id"))
}
"serialize GCounter" in {
@@ -207,6 +211,7 @@ class ReplicatedDataSerializerSpec
checkSameContent(
GCounter().increment(address1, 2).increment(address3, 5),
GCounter().increment(address3, 5).increment(address1, 2))
+ checkSerialization(GCounterKey("id"))
}
"serialize PNCounter" in {
@@ -225,6 +230,7 @@ class ReplicatedDataSerializerSpec
checkSameContent(
PNCounter().increment(address1, 2).decrement(address1,
1).increment(address3, 5),
PNCounter().increment(address3, 5).increment(address1,
2).decrement(address1, 1))
+ checkSerialization(PNCounterKey("id"))
}
"serialize ORMap" in {
@@ -233,6 +239,7 @@ class ReplicatedDataSerializerSpec
checkSerialization(ORMap().put(address1, 1L, GSet() + "A"))
// use Flag for this test as object key because it is serializable
checkSerialization(ORMap().put(address1, Flag(), GSet() + "A"))
+ checkSerialization(ORMapKey[UniqueAddress, GSet[String]]("id"))
}
"serialize ORMap delta" in {
@@ -270,6 +277,7 @@ class ReplicatedDataSerializerSpec
LWWMap()
.put(address1, "a", "value1", LWWRegister.defaultClock[Any])
.put(address2, "b", 17, LWWRegister.defaultClock[Any]))
+ checkSerialization(LWWMapKey[UniqueAddress, String]("id"))
}
"serialize PNCounterMap" in {
@@ -280,6 +288,7 @@ class ReplicatedDataSerializerSpec
checkSerialization(PNCounterMap().increment(address1, Flag(), 3))
checkSerialization(
PNCounterMap().increment(address1, "a", 3).decrement(address2, "a",
2).increment(address2, "b", 5))
+ checkSerialization(PNCounterMapKey[String]("id"))
}
"serialize ORMultiMap" in {
@@ -303,6 +312,7 @@ class ReplicatedDataSerializerSpec
val m3 = ORMultiMap.empty[String, String].addBinding(address1, "a", "A1")
val d3 = m3.resetDelta.addBinding(address1, "a",
"A2").addBinding(address1, "a", "A3").delta.get
checkSerialization(d3)
+ checkSerialization(ORMultiMapKey[String, String]("id"))
}
"serialize ORMultiMap withValueDeltas" in {
@@ -340,5 +350,9 @@ class ReplicatedDataSerializerSpec
checkSameContent(v1.merge(v2), v2.merge(v1))
}
+ "serialize UnspecificKey" in {
+ checkSerialization(Key.UnspecificKey("id"))
+ }
+
}
}
diff --git a/docs/src/main/paradox/typed/distributed-data.md
b/docs/src/main/paradox/typed/distributed-data.md
index 0013e4e52c..84a4104bad 100644
--- a/docs/src/main/paradox/typed/distributed-data.md
+++ b/docs/src/main/paradox/typed/distributed-data.md
@@ -176,6 +176,10 @@ configurable
`pekko.cluster.distributed-data.notify-subscribers-interval`.
The subscriber is automatically unsubscribed if the subscriber is terminated.
A subscriber can
also be de-registered with the `replicatorAdapter.unsubscribe(key)` function.
+In addition to subscribing to individual keys it is possible to subscribe to
all keys with a given prefix
+by using a `*` at the end of the key `id`. For example
`GCounterKey("counter-*")`. Notifications will be
+sent for all matching keys, also new keys added later.
+
### Delete
A data entry can be deleted by sending a `Replicator.Delete` message to the
local
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]