This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/main by this push:
     new 15aeee6df9 Port CRDT key expiry from akka-core #31721 (#2733)
15aeee6df9 is described below

commit 15aeee6df9dfb6836aac3bd54876300c7b91ffad
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Mar 16 20:44:56 2026 +0100

    Port CRDT key expiry from akka-core #31721 (#2733)
    
    * Initial plan
    
    * Port CRDT expiry feature from akka-core PR #31721
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix code review issues: dbDelete error message, 0L consistency, isExpired 
now param
    
    Co-authored-by: pjfanning <[email protected]>
    
    * scalafmt
    
    * Create crdt-expiry.excludes
    
    * Update crdt-expiry.excludes
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../ddata/typed/internal/ReplicatorBehavior.scala  |   1 +
 .../cluster/ddata/typed/javadsl/Replicator.scala   |   5 +
 .../cluster/ddata/typed/scaladsl/Replicator.scala  |   9 +
 .../ddata/protobuf/msg/ReplicatorMessages.java     | 198 +++++++++++++
 .../2.0.x.backwards.excludes/crdt-expiry.excludes  |  34 +++
 .../src/main/protobuf/ReplicatorMessages.proto     |   2 +
 distributed-data/src/main/resources/reference.conf |  10 +
 .../apache/pekko/cluster/ddata/DurableStore.scala  |  60 +++-
 .../apache/pekko/cluster/ddata/Replicator.scala    | 311 +++++++++++++++++----
 .../protobuf/ReplicatorMessageSerializer.scala     |  23 +-
 .../cluster/ddata/ReplicatorSettingsSpec.scala     |  16 ++
 .../protobuf/ReplicatorMessageSerializerSpec.scala |  21 +-
 12 files changed, 619 insertions(+), 71 deletions(-)

diff --git 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
index 9988fb3076..d245699ca8 100644
--- 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
+++ 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/internal/ReplicatorBehavior.scala
@@ -159,6 +159,7 @@ import pekko.util.Timeout
                 rsp match {
                   case chg: dd.Replicator.Changed[_] => subscriber ! 
JReplicator.Changed(chg.key)(chg.dataValue)
                   case del: dd.Replicator.Deleted[_] => subscriber ! 
JReplicator.Deleted(del.key)
+                  case exp: dd.Replicator.Expired[_] => subscriber ! 
JReplicator.Expired(exp.key)
                 }
                 Behaviors.same
 
diff --git 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
index bf391d57ac..c6b536a5d1 100644
--- 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
+++ 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/javadsl/Replicator.scala
@@ -313,6 +313,11 @@ object Replicator {
    */
   final case class Deleted[A <: ReplicatedData](key: Key[A]) extends 
SubscribeResponse[A]
 
+  /**
+   * @see [[Replicator.Subscribe]]
+   */
+  final case class Expired[A <: ReplicatedData](key: Key[A]) extends 
SubscribeResponse[A]
+
   /**
    * Send this message to the local `Replicator` to delete a data value for the
    * given `key`. The `Replicator` will reply with one of the 
[[DeleteResponse]] messages.
diff --git 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
index 36cf2f13cf..190c44c37f 100644
--- 
a/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
+++ 
b/cluster-typed/src/main/scala/org/apache/pekko/cluster/ddata/typed/scaladsl/Replicator.scala
@@ -301,6 +301,15 @@ object Replicator {
    */
   type Deleted[A <: ReplicatedData] = dd.Replicator.Deleted[A]
 
+  object Expired {
+    def unapply[A <: ReplicatedData](expired: Expired[A]): Option[Key[A]] = 
Some(expired.key)
+  }
+
+  /**
+   * @see [[Subscribe]]
+   */
+  type Expired[A <: ReplicatedData] = dd.Replicator.Expired[A]
+
   object Delete {
 
     /**
diff --git 
a/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
 
b/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
index c39780857b..8fc406ea5c 100644
--- 
a/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
+++ 
b/distributed-data/src/main/java/org/apache/pekko/cluster/ddata/protobuf/msg/ReplicatorMessages.java
@@ -11578,6 +11578,17 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
        * @return The digest.
        */
       org.apache.pekko.protobufv3.internal.ByteString getDigest();
+
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return Whether the usedTimestamp field is set.
+       */
+      boolean hasUsedTimestamp();
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return The usedTimestamp.
+       */
+      long getUsedTimestamp();
     }
     /**
      * Protobuf type {@code org.apache.pekko.cluster.ddata.Status.Entry}
@@ -11687,6 +11698,25 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
         return digest_;
       }
 
+      public static final int USEDTIMESTAMP_FIELD_NUMBER = 3;
+      private long usedTimestamp_;
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return Whether the usedTimestamp field is set.
+       */
+      @java.lang.Override
+      public boolean hasUsedTimestamp() {
+        return ((bitField0_ & 0x00000004) != 0);
+      }
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return The usedTimestamp.
+       */
+      @java.lang.Override
+      public long getUsedTimestamp() {
+        return usedTimestamp_;
+      }
+
       private byte memoizedIsInitialized = -1;
       @java.lang.Override
       public final boolean isInitialized() {
@@ -11715,6 +11745,9 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
         if (((bitField0_ & 0x00000002) != 0)) {
           output.writeBytes(2, digest_);
         }
+        if (((bitField0_ & 0x00000004) != 0)) {
+          output.writeSInt64(3, usedTimestamp_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -11731,6 +11764,10 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           size += org.apache.pekko.protobufv3.internal.CodedOutputStream
             .computeBytesSize(2, digest_);
         }
+        if (((bitField0_ & 0x00000004) != 0)) {
+          size += org.apache.pekko.protobufv3.internal.CodedOutputStream
+            .computeSInt64Size(3, usedTimestamp_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSize = size;
         return size;
@@ -11756,6 +11793,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           if (!getDigest()
               .equals(other.getDigest())) return false;
         }
+        if (hasUsedTimestamp() != other.hasUsedTimestamp()) return false;
+        if (hasUsedTimestamp()) {
+          if (getUsedTimestamp()
+              != other.getUsedTimestamp()) return false;
+        }
         if (!getUnknownFields().equals(other.getUnknownFields())) return false;
         return true;
       }
@@ -11775,6 +11817,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           hash = (37 * hash) + DIGEST_FIELD_NUMBER;
           hash = (53 * hash) + getDigest().hashCode();
         }
+        if (hasUsedTimestamp()) {
+          hash = (37 * hash) + USEDTIMESTAMP_FIELD_NUMBER;
+          hash = (53 * hash) + 
org.apache.pekko.protobufv3.internal.Internal.hashLong(
+              getUsedTimestamp());
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -11908,6 +11955,7 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           bitField0_ = 0;
           key_ = "";
           digest_ = org.apache.pekko.protobufv3.internal.ByteString.EMPTY;
+          usedTimestamp_ = 0L;
           return this;
         }
 
@@ -11950,6 +11998,10 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
             result.digest_ = digest_;
             to_bitField0_ |= 0x00000002;
           }
+          if (((from_bitField0_ & 0x00000004) != 0)) {
+            result.usedTimestamp_ = usedTimestamp_;
+            to_bitField0_ |= 0x00000004;
+          }
           result.bitField0_ |= to_bitField0_;
         }
 
@@ -11973,6 +12025,9 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           if (other.hasDigest()) {
             setDigest(other.getDigest());
           }
+          if (other.hasUsedTimestamp()) {
+            setUsedTimestamp(other.getUsedTimestamp());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           onChanged();
           return this;
@@ -12015,6 +12070,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
                   bitField0_ |= 0x00000002;
                   break;
                 } // case 18
+                case 24: {
+                  usedTimestamp_ = input.readSInt64();
+                  bitField0_ |= 0x00000004;
+                  break;
+                } // case 24
                 default: {
                   if (!super.parseUnknownField(input, extensionRegistry, tag)) 
{
                     done = true; // was an endgroup tag
@@ -12152,6 +12212,45 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           return this;
         }
 
+        private long usedTimestamp_;
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return Whether the usedTimestamp field is set.
+         */
+        @java.lang.Override
+        public boolean hasUsedTimestamp() {
+          return ((bitField0_ & 0x00000004) != 0);
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return The usedTimestamp.
+         */
+        @java.lang.Override
+        public long getUsedTimestamp() {
+          return usedTimestamp_;
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @param value The usedTimestamp to set.
+         * @return This builder for chaining.
+         */
+        public Builder setUsedTimestamp(long value) {
+          usedTimestamp_ = value;
+          bitField0_ |= 0x00000004;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return This builder for chaining.
+         */
+        public Builder clearUsedTimestamp() {
+          bitField0_ = (bitField0_ & ~0x00000004);
+          usedTimestamp_ = 0L;
+          onChanged();
+          return this;
+        }
+
         // 
@@protoc_insertion_point(builder_scope:org.apache.pekko.cluster.ddata.Status.Entry)
       }
 
@@ -13393,6 +13492,17 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
        * <code>required .org.apache.pekko.cluster.ddata.DataEnvelope envelope 
= 2;</code>
        */
       
org.apache.pekko.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelopeOrBuilder
 getEnvelopeOrBuilder();
+
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return Whether the usedTimestamp field is set.
+       */
+      boolean hasUsedTimestamp();
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return The usedTimestamp.
+       */
+      long getUsedTimestamp();
     }
     /**
      * Protobuf type {@code org.apache.pekko.cluster.ddata.Gossip.Entry}
@@ -13508,6 +13618,25 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
         return envelope_ == null ? 
org.apache.pekko.cluster.ddata.protobuf.msg.ReplicatorMessages.DataEnvelope.getDefaultInstance()
 : envelope_;
       }
 
+      public static final int USEDTIMESTAMP_FIELD_NUMBER = 3;
+      private long usedTimestamp_;
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return Whether the usedTimestamp field is set.
+       */
+      @java.lang.Override
+      public boolean hasUsedTimestamp() {
+        return ((bitField0_ & 0x00000004) != 0);
+      }
+      /**
+       * <code>optional sint64 usedTimestamp = 3;</code>
+       * @return The usedTimestamp.
+       */
+      @java.lang.Override
+      public long getUsedTimestamp() {
+        return usedTimestamp_;
+      }
+
       private byte memoizedIsInitialized = -1;
       @java.lang.Override
       public final boolean isInitialized() {
@@ -13540,6 +13669,9 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
         if (((bitField0_ & 0x00000002) != 0)) {
           output.writeMessage(2, getEnvelope());
         }
+        if (((bitField0_ & 0x00000004) != 0)) {
+          output.writeSInt64(3, usedTimestamp_);
+        }
         getUnknownFields().writeTo(output);
       }
 
@@ -13556,6 +13688,10 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           size += org.apache.pekko.protobufv3.internal.CodedOutputStream
             .computeMessageSize(2, getEnvelope());
         }
+        if (((bitField0_ & 0x00000004) != 0)) {
+          size += org.apache.pekko.protobufv3.internal.CodedOutputStream
+            .computeSInt64Size(3, usedTimestamp_);
+        }
         size += getUnknownFields().getSerializedSize();
         memoizedSize = size;
         return size;
@@ -13581,6 +13717,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           if (!getEnvelope()
               .equals(other.getEnvelope())) return false;
         }
+        if (hasUsedTimestamp() != other.hasUsedTimestamp()) return false;
+        if (hasUsedTimestamp()) {
+          if (getUsedTimestamp()
+              != other.getUsedTimestamp()) return false;
+        }
         if (!getUnknownFields().equals(other.getUnknownFields())) return false;
         return true;
       }
@@ -13600,6 +13741,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           hash = (37 * hash) + ENVELOPE_FIELD_NUMBER;
           hash = (53 * hash) + getEnvelope().hashCode();
         }
+        if (hasUsedTimestamp()) {
+          hash = (37 * hash) + USEDTIMESTAMP_FIELD_NUMBER;
+          hash = (53 * hash) + 
org.apache.pekko.protobufv3.internal.Internal.hashLong(
+              getUsedTimestamp());
+        }
         hash = (29 * hash) + getUnknownFields().hashCode();
         memoizedHashCode = hash;
         return hash;
@@ -13743,6 +13889,7 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
             envelopeBuilder_.dispose();
             envelopeBuilder_ = null;
           }
+          usedTimestamp_ = 0L;
           return this;
         }
 
@@ -13787,6 +13934,10 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
                 : envelopeBuilder_.build();
             to_bitField0_ |= 0x00000002;
           }
+          if (((from_bitField0_ & 0x00000004) != 0)) {
+            result.usedTimestamp_ = usedTimestamp_;
+            to_bitField0_ |= 0x00000004;
+          }
           result.bitField0_ |= to_bitField0_;
         }
 
@@ -13810,6 +13961,9 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
           if (other.hasEnvelope()) {
             mergeEnvelope(other.getEnvelope());
           }
+          if (other.hasUsedTimestamp()) {
+            setUsedTimestamp(other.getUsedTimestamp());
+          }
           this.mergeUnknownFields(other.getUnknownFields());
           onChanged();
           return this;
@@ -13857,6 +14011,11 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
                   bitField0_ |= 0x00000002;
                   break;
                 } // case 18
+                case 24: {
+                  usedTimestamp_ = input.readSInt64();
+                  bitField0_ |= 0x00000004;
+                  break;
+                } // case 24
                 default: {
                   if (!super.parseUnknownField(input, extensionRegistry, tag)) 
{
                     done = true; // was an endgroup tag
@@ -14076,6 +14235,45 @@ public final class ReplicatorMessages extends 
org.apache.pekko.protobufv3.intern
         }
 
         // 
@@protoc_insertion_point(builder_scope:org.apache.pekko.cluster.ddata.Gossip.Entry)
+
+        private long usedTimestamp_;
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return Whether the usedTimestamp field is set.
+         */
+        @java.lang.Override
+        public boolean hasUsedTimestamp() {
+          return ((bitField0_ & 0x00000004) != 0);
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return The usedTimestamp.
+         */
+        @java.lang.Override
+        public long getUsedTimestamp() {
+          return usedTimestamp_;
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @param value The usedTimestamp to set.
+         * @return This builder for chaining.
+         */
+        public Builder setUsedTimestamp(long value) {
+          usedTimestamp_ = value;
+          bitField0_ |= 0x00000004;
+          onChanged();
+          return this;
+        }
+        /**
+         * <code>optional sint64 usedTimestamp = 3;</code>
+         * @return This builder for chaining.
+         */
+        public Builder clearUsedTimestamp() {
+          bitField0_ = (bitField0_ & ~0x00000004);
+          usedTimestamp_ = 0L;
+          onChanged();
+          return this;
+        }
       }
 
       // 
@@protoc_insertion_point(class_scope:org.apache.pekko.cluster.ddata.Gossip.Entry)
diff --git 
a/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
 
b/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
new file mode 100644
index 0000000000..b42e372aea
--- /dev/null
+++ 
b/distributed-data/src/main/mima-filters/2.0.x.backwards.excludes/crdt-expiry.excludes
@@ -0,0 +1,34 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Support CRDT Key Expiry
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.dataEntries")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.dataEntries_=")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.receiveStatus")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator.receiveGossip")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.updatedData")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.copy*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.this")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip.unapply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Gossip._1")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.digests")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.copy*")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.this")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.apply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status.unapply")
+ProblemFilters.exclude[IncompatibleSignatureProblem]("org.apache.pekko.cluster.ddata.Replicator#Internal#Status._1")
diff --git a/distributed-data/src/main/protobuf/ReplicatorMessages.proto 
b/distributed-data/src/main/protobuf/ReplicatorMessages.proto
index 2471285636..240e490d93 100644
--- a/distributed-data/src/main/protobuf/ReplicatorMessages.proto
+++ b/distributed-data/src/main/protobuf/ReplicatorMessages.proto
@@ -96,6 +96,7 @@ message Status {
   message Entry {
     required string key = 1;
     required bytes digest = 2;
+    optional sint64 usedTimestamp = 3;
   }
   
   required uint32 chunk = 1;
@@ -109,6 +110,7 @@ message Gossip {
   message Entry {
     required string key = 1;
     required DataEnvelope envelope = 2;
+    optional sint64 usedTimestamp = 3;
   }
   
   required bool sendBack = 1;
diff --git a/distributed-data/src/main/resources/reference.conf 
b/distributed-data/src/main/resources/reference.conf
index f7a5c5b5ad..49457a6fe1 100644
--- a/distributed-data/src/main/resources/reference.conf
+++ b/distributed-data/src/main/resources/reference.conf
@@ -81,6 +81,16 @@ pekko.cluster.distributed-data {
     # This is number of elements or similar size hint, not size in bytes.
     max-delta-size = 50
   }
+
+  # Map of keys and inactivity duration for entries that will automatically be 
removed
+  # without tombstones when they have been inactive for the given duration.
+  # Prefix matching is supported by using * at the end of a key.
+  # Matching tombstones will also be removed after the expiry duration.
+  expire-keys-after-inactivity {
+    # Example syntax:
+    # "key-1" = 10 minutes
+    # "cache-*" = 2 minutes
+  }
   
   durable {
     # List of keys that are durable. Prefix matching is supported by using * 
at the
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
index b7db7e1935..b894976894 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/DurableStore.scala
@@ -60,6 +60,9 @@ import com.typesafe.config.Config
  * When the `Replicator` needs to store a value it sends a `Store` message
  * to the durable store actor, which must then reply with the `successMsg` or
  * `failureMsg` to the `replyTo`.
+ *
+ * When entries have expired the `Replicator` sends a `Expire` message to the 
durable
+ * store actor, which can delete the entries from the backend store.
  */
 object DurableStore {
 
@@ -87,6 +90,11 @@ object DurableStore {
     def this(message: String) = this(message, null)
   }
 
+  /**
+   * Request to expire (remove) entries.
+   */
+  final case class Expire(keys: Set[KeyId])
+
   /**
    * Wrapper class for serialization of a data value.
    * The `ReplicatorMessageSerializer` will serialize/deserialize
@@ -149,7 +157,8 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
   private def lmdb(): Lmdb = _lmdb match {
     case OptionVal.Some(l) => l
     case _                 =>
-      val t0 = System.nanoTime()
+      val debugEnabled = log.isDebugEnabled
+      val t0 = if (debugEnabled) System.nanoTime() else 0L
       log.info("Using durable data in LMDB directory [{}]", 
dir.getCanonicalPath)
       val env = {
         val mapSize = config.getBytes("lmdb.map-size")
@@ -162,11 +171,11 @@ final class LmdbDurableStore(config: Config) extends 
Actor with ActorLogging {
       val keyBuffer = ByteBuffer.allocateDirect(env.getMaxKeySize)
       val valueBuffer = ByteBuffer.allocateDirect(100 * 1024) // will grow 
when needed
 
-      if (log.isDebugEnabled)
+      if (debugEnabled)
         log.debug(
           "Init of LMDB in directory [{}] took [{} ms]",
           dir.getCanonicalPath,
-          TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
       val l = Lmdb(env, db, keyBuffer, valueBuffer)
       _lmdb = OptionVal.Some(l)
       l
@@ -208,8 +217,9 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
   def init: Receive = {
     case LoadAll =>
       if (dir.exists && dir.list().length > 0) {
+        val debugEnabled = log.isDebugEnabled
+        val t0 = if (debugEnabled) System.nanoTime() else 0L
         val l = lmdb()
-        val t0 = System.nanoTime()
         val tx = l.env.txnRead()
         try {
           val iter = l.db.iterate(tx)
@@ -228,8 +238,11 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
             if (loadData.data.nonEmpty)
               sender() ! loadData
             sender() ! LoadAllCompleted
-            if (log.isDebugEnabled)
-              log.debug("load all of [{}] entries took [{} ms]", n, 
TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+            if (debugEnabled)
+              log.debug(
+                "load all of [{}] entries took [{} ms]",
+                n,
+                TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
             context.become(active)
           } finally {
             Try(iter.close())
@@ -275,6 +288,9 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
 
     case WriteBehind =>
       writeBehind()
+
+    case Expire(keys) =>
+      dbDelete(keys)
   }
 
   def dbPut(tx: OptionVal[Txn[ByteBuffer]], key: KeyId, data: 
DurableDataEnvelope): Unit = {
@@ -297,7 +313,8 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
 
   def writeBehind(): Unit = {
     if (!pending.isEmpty()) {
-      val t0 = System.nanoTime()
+      val debugEnabled = log.isDebugEnabled
+      val t0 = if (debugEnabled) System.nanoTime() else 0L
       val tx = lmdb().env.txnWrite()
       try {
         val iter = pending.entrySet.iterator
@@ -306,11 +323,11 @@ final class LmdbDurableStore(config: Config) extends 
Actor with ActorLogging {
           dbPut(OptionVal.Some(tx), entry.getKey, entry.getValue)
         }
         tx.commit()
-        if (log.isDebugEnabled)
+        if (debugEnabled)
           log.debug(
             "store and commit of [{}] entries took [{} ms]",
             pending.size,
-            TimeUnit.NANOSECONDS.toMillis(System.nanoTime - t0))
+            TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
       } catch {
         case NonFatal(e) =>
           import scala.jdk.CollectionConverters._
@@ -322,4 +339,29 @@ final class LmdbDurableStore(config: Config) extends Actor 
with ActorLogging {
     }
   }
 
+  def dbDelete(keys: Set[KeyId]): Unit = {
+    val debugEnabled = log.isDebugEnabled
+    val t0 = if (debugEnabled) System.nanoTime() else 0L
+    val l = lmdb()
+    val tx = lmdb().env.txnWrite()
+    try {
+      keys.foreach { key =>
+        l.keyBuffer.put(key.getBytes(ByteString.UTF_8)).flip()
+        l.db.delete(tx, l.keyBuffer)
+      }
+      tx.commit()
+      if (debugEnabled)
+        log.debug(
+          "delete and commit of [{}] entries took [{} ms]",
+          keys.size,
+          TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - t0))
+    } catch {
+      case NonFatal(e) =>
+        log.error(e, "failed to delete [{}]", keys.mkString(","))
+        tx.abort()
+    } finally {
+      l.keyBuffer.clear()
+    }
+  }
+
 }
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
index b4b69cab6b..bb94ca7e8b 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/Replicator.scala
@@ -112,7 +112,25 @@ object ReplicatorSettings {
       deltaCrdtEnabled = config.getBoolean("delta-crdt.enabled"),
       maxDeltaSize = config.getInt("delta-crdt.max-delta-size"),
       preferOldest = config.getBoolean("prefer-oldest"),
-      logDataSizeExceeding = logDataSizeExceeding)
+      logDataSizeExceeding = logDataSizeExceeding,
+      expiryKeys = parseExpiry(config))
+  }
+
+  /**
+   * INTERNAL API
+   */
+  @InternalApi private[pekko] def parseExpiry(config: Config): Map[KeyId, 
FiniteDuration] = {
+    import scala.jdk.CollectionConverters._
+    val expiryConfig = config.getConfig("expire-keys-after-inactivity")
+    expiryConfig.root
+      .keySet()
+      .iterator()
+      .asScala
+      .map { key =>
+        val quotedKey = s""""$key"""" // must use quotes because of the 
wildcard *
+        key -> expiryConfig.getDuration(quotedKey, MILLISECONDS).millis
+      }
+      .toMap
   }
 
   /**
@@ -158,6 +176,10 @@ object ReplicatorSettings {
  *        in the `Set`.
  * @param preferOldest Update and Get operations are sent to oldest nodes 
first.
  * @param logDataSizeExceeding Log data size.
+ * @param expiryKeys Map of keys and inactivity duration for entries that will 
automatically be removed
+ *        without tombstones when they have been inactive for the given 
duration.
+ *        Prefix matching is supported by using * at the end of a key.
+ *        Matching tombstones will also be removed after the expiry duration.
  */
 final class ReplicatorSettings(
     val roles: Set[String],
@@ -174,7 +196,8 @@ final class ReplicatorSettings(
     val deltaCrdtEnabled: Boolean,
     val maxDeltaSize: Int,
     val preferOldest: Boolean,
-    val logDataSizeExceeding: Option[Int]) {
+    val logDataSizeExceeding: Option[Int],
+    val expiryKeys: Map[KeyId, FiniteDuration]) {
 
   def withRole(role: String): ReplicatorSettings = copy(roles = 
ReplicatorSettings.roleOption(role).toSet)
 
@@ -247,6 +270,55 @@ final class ReplicatorSettings(
   def withLogDataSizeExceeding(logDataSizeExceeding: Int): ReplicatorSettings =
     copy(logDataSizeExceeding = Some(logDataSizeExceeding))
 
+  // for backwards compatibility
+  def this(
+      roles: Set[String],
+      gossipInterval: FiniteDuration,
+      notifySubscribersInterval: FiniteDuration,
+      maxDeltaElements: Int,
+      dispatcher: String,
+      pruningInterval: FiniteDuration,
+      maxPruningDissemination: FiniteDuration,
+      durableStoreProps: Either[(String, Config), Props],
+      durableKeys: Set[KeyId],
+      pruningMarkerTimeToLive: FiniteDuration,
+      durablePruningMarkerTimeToLive: FiniteDuration,
+      deltaCrdtEnabled: Boolean,
+      maxDeltaSize: Int,
+      preferOldest: Boolean,
+      logDataSizeExceeding: Option[Int]) =
+    this(
+      roles,
+      gossipInterval,
+      notifySubscribersInterval,
+      maxDeltaElements,
+      dispatcher,
+      pruningInterval,
+      maxPruningDissemination,
+      durableStoreProps,
+      durableKeys,
+      pruningMarkerTimeToLive,
+      durablePruningMarkerTimeToLive,
+      deltaCrdtEnabled,
+      maxDeltaSize,
+      preferOldest,
+      logDataSizeExceeding,
+      expiryKeys = Map.empty)
+
+  /**
+   * Scala API
+   */
+  def withExpiryKeys(expiryKeys: Map[KeyId, FiniteDuration]): 
ReplicatorSettings =
+    copy(expiryKeys = expiryKeys)
+
+  /**
+   * Java API
+   */
+  def withExpiryKeys(expiryKeys: java.util.Map[String, java.time.Duration]): 
ReplicatorSettings = {
+    import scala.jdk.CollectionConverters._
+    withExpiryKeys(expiryKeys.asScala.iterator.map { case (key, value) => key 
-> value.toScala }.toMap)
+  }
+
   private def copy(
       roles: Set[String] = roles,
       gossipInterval: FiniteDuration = gossipInterval,
@@ -262,7 +334,8 @@ final class ReplicatorSettings(
       deltaCrdtEnabled: Boolean = deltaCrdtEnabled,
       maxDeltaSize: Int = maxDeltaSize,
       preferOldest: Boolean = preferOldest,
-      logDataSizeExceeding: Option[Int] = logDataSizeExceeding): 
ReplicatorSettings =
+      logDataSizeExceeding: Option[Int] = logDataSizeExceeding,
+      expiryKeys: Map[KeyId, FiniteDuration] = expiryKeys): ReplicatorSettings 
=
     new ReplicatorSettings(
       roles,
       gossipInterval,
@@ -278,10 +351,12 @@ final class ReplicatorSettings(
       deltaCrdtEnabled,
       maxDeltaSize,
       preferOldest,
-      logDataSizeExceeding)
+      logDataSizeExceeding,
+      expiryKeys)
 }
 
 object Replicator {
+  private type Timestamp = Long
 
   /**
    * Factory method for the [[pekko.actor.Props]] of the [[Replicator]] actor.
@@ -500,6 +575,9 @@ object Replicator {
    *
    * If the key is deleted the subscriber is notified with a [[Deleted]]
    * message.
+   *
+   * If the key is expired the subscriber is notified with an [[Expired]]
+   * message.
    */
   final case class Subscribe[A <: ReplicatedData](key: Key[A], subscriber: 
ActorRef) extends ReplicatorMessage
 
@@ -546,6 +624,11 @@ object Replicator {
    */
   final case class Deleted[A <: ReplicatedData](key: Key[A]) extends 
SubscribeResponse[A]
 
+  /**
+   * @see [[Replicator.Subscribe]]
+   */
+  final case class Expired[A <: ReplicatedData](key: Key[A]) extends 
SubscribeResponse[A]
+
   object Update {
 
     /**
@@ -932,7 +1015,7 @@ object Replicator {
     }
 
     final case class Status(
-        digests: Map[KeyId, Digest],
+        digests: Map[KeyId, (Digest, Timestamp)],
         chunk: Int,
         totChunks: Int,
         toSystemUid: Option[Long],
@@ -942,12 +1025,12 @@ object Replicator {
       override def toString: String =
         digests
           .map {
-            case (key, bytes) => key + " -> " + bytes.map(byte => 
f"$byte%02x").mkString("")
+            case (key, (bytes, _)) => key + " -> " + bytes.map(byte => 
f"$byte%02x").mkString("")
           }
           .mkString("Status(", ", ", ")")
     }
     final case class Gossip(
-        updatedData: Map[KeyId, DataEnvelope],
+        updatedData: Map[KeyId, (DataEnvelope, Timestamp)],
         sendBack: Boolean,
         toSystemUid: Option[Long],
         fromSystemUid: Option[Long])
@@ -1227,6 +1310,11 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   val serializer = 
SerializationExtension(context.system).serializerFor(classOf[DataEnvelope])
   val maxPruningDisseminationNanos = maxPruningDissemination.toNanos
 
+  val expiryWildcards = settings.expiryKeys.collect { case (k, v) if 
k.endsWith("*") => k.dropRight(1) -> v }
+  val expiryEnabled: Boolean = settings.expiryKeys.nonEmpty
+  // updated on the gossip tick to avoid too many calls to 
`currentTimeMillis()`
+  private var currentUsedTimestamp = if (expiryEnabled) 
System.currentTimeMillis() else 0L
+
   val hasDurableKeys = settings.durableKeys.nonEmpty
   val durable = settings.durableKeys.filterNot(_.endsWith("*"))
   val durableWildcards = settings.durableKeys.collect { case k if 
k.endsWith("*") => k.dropRight(1) }
@@ -1311,8 +1399,8 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   var unreachable = Set.empty[UniqueAddress]
 
   // the actual data
-  var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest)]
-  // keys that have changed, Changed event published to subscribers on 
FlushChanges
+  var dataEntries = Map.empty[KeyId, (DataEnvelope, Digest, Timestamp)]
+  // keys that have changed, Changed, Deleted, Expired event published to 
subscribers on FlushChanges
   var changed = Set.empty[KeyId]
 
   // for splitting up gossip in chunks
@@ -1514,6 +1602,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   def receiveGet(key: KeyR, consistency: ReadConsistency, req: Option[Any]): 
Unit = {
     val localValue = getData(key.id)
     log.debug("Received Get for key [{}].", key)
+    updateUsedTimestamp(key.id, currentUsedTimestamp)
     if (isLocalGet(consistency)) {
       val reply = localValue match {
         case Some(DataEnvelope(DeletedData, _, _)) => GetDataDeleted(key, req)
@@ -1604,6 +1693,8 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
         // so that the latest delta version is used
         val newEnvelope = setData(key.id, envelope)
 
+        updateUsedTimestamp(key.id, currentUsedTimestamp)
+
         val durable = isDurable(key.id)
         if (isLocalUpdate(writeConsistency)) {
           if (durable)
@@ -1724,7 +1815,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   def receiveGetKeyIds(): Unit = {
     val keys: Set[KeyId] = dataEntries.iterator
       .collect {
-        case (key, (DataEnvelope(data, _, _), _)) if data != DeletedData => key
+        case (key, (DataEnvelope(data, _, _), _, _)) if data != DeletedData => 
key
       }
       .to(immutable.Set)
     replyTo ! GetKeyIdsResult(keys)
@@ -1737,6 +1828,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
         replyTo ! DataDeleted(key, req)
       case _ =>
         setData(key.id, DeletedEnvelope)
+        updateUsedTimestamp(key.id, currentUsedTimestamp)
         payloadSizeAggregator.remove(key.id)
         val durable = isDurable(key.id)
         if (isLocalUpdate(consistency)) {
@@ -1801,7 +1893,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
       } else if (newEnvelope.data == DeletedData) DeletedDigest
       else LazyDigest
 
-    dataEntries = dataEntries.updated(key, (newEnvelope, dig))
+    dataEntries = dataEntries.updated(key, (newEnvelope, dig, 
getUsedTimestamp(key)))
     if (newEnvelope.data == DeletedData)
       deltaPropagationSelector.delete(key)
     newEnvelope
@@ -1809,13 +1901,13 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
 
   def getDigest(key: KeyId): Digest = {
     dataEntries.get(key) match {
-      case Some((envelope, LazyDigest)) =>
+      case Some((envelope, LazyDigest, usedTimestamp)) =>
         val (d, size) = digest(envelope)
         payloadSizeAggregator.updatePayloadSize(key, size)
-        dataEntries = dataEntries.updated(key, (envelope, d))
+        dataEntries = dataEntries.updated(key, (envelope, d, usedTimestamp))
         d
-      case Some((_, digest)) => digest
-      case None              => NotFoundDigest
+      case Some((_, digest, _)) => digest
+      case None                 => NotFoundDigest
     }
   }
 
@@ -1830,44 +1922,103 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
       (dig, bytes.length)
     }
 
-  def getData(key: KeyId): Option[DataEnvelope] = dataEntries.get(key).map { 
case (envelope, _) => envelope }
+  def getData(key: KeyId): Option[DataEnvelope] =
+    dataEntries.get(key).map { case (envelope, _, _) => envelope }
+
+  def getExpiryDuration(key: KeyId): FiniteDuration = {
+    if (expiryEnabled) {
+      settings.expiryKeys.get(key) match {
+        case Some(d)                         => d
+        case None if expiryWildcards.isEmpty => Duration.Zero
+        case None                            =>
+          expiryWildcards
+            .collectFirst {
+              case (k, v) if key.startsWith(k) => v
+            }
+            .getOrElse(Duration.Zero)
+      }
+    } else {
+      Duration.Zero
+    }
+  }
+
+  def getUsedTimestamp(key: KeyId): Timestamp = {
+    if (expiryEnabled) {
+      dataEntries.get(key) match {
+        case Some((_, _, usedTimestamp)) => usedTimestamp
+        case None                        => 0L
+      }
+    } else {
+      0L
+    }
+  }
+
+  def isExpired(key: KeyId): Boolean = {
+    val now = System.currentTimeMillis()
+    isExpired(key, getUsedTimestamp(key), now)
+  }
+
+  def isExpired(key: KeyId, timestamp: Timestamp): Boolean = {
+    isExpired(key, timestamp, System.currentTimeMillis())
+  }
+
+  def isExpired(key: KeyId, timestamp: Timestamp, now: Long): Boolean = {
+    expiryEnabled && timestamp != 0L && timestamp <= now - 
getExpiryDuration(key).toMillis
+  }
+
+  def updateUsedTimestamp(key: KeyId, timestamp: Timestamp): Unit = {
+    if (expiryEnabled && timestamp != 0L) {
+      dataEntries.get(key).foreach {
+        case (existingEnvelope, existingDigest, existingTimestamp) =>
+          if (timestamp > existingTimestamp)
+            dataEntries = dataEntries.updated(key, (existingEnvelope, 
existingDigest, timestamp))
+      }
+    }
+  }
 
   def getDeltaSeqNr(key: KeyId, fromNode: UniqueAddress): Long =
     dataEntries.get(key) match {
-      case Some((DataEnvelope(_, _, deltaVersions), _)) => 
deltaVersions.versionAt(fromNode)
-      case None                                         => 0L
+      case Some((DataEnvelope(_, _, deltaVersions), _, _)) => 
deltaVersions.versionAt(fromNode)
+      case None                                            => 0L
     }
 
   def isNodeRemoved(node: UniqueAddress, keys: Iterable[KeyId]): Boolean = {
     removedNodes.contains(node) ||
     (keys.exists(key =>
       dataEntries.get(key) match {
-        case Some((DataEnvelope(_, pruning, _), _)) => pruning.contains(node)
-        case None                                   => false
+        case Some((DataEnvelope(_, pruning, _), _, _)) => 
pruning.contains(node)
+        case None                                      => false
       }))
   }
 
+  @nowarn("msg=deprecated")
   def receiveFlushChanges(): Unit = {
-    def notify(keyId: KeyId, subs: mutable.Set[ActorRef]): Unit = {
+    def notify(keyId: KeyId, subs: mutable.Set[ActorRef], 
sendExpiredIfMissing: Boolean): Unit = {
       val key = subscriptionKeys(keyId)
       getData(keyId) match {
         case Some(envelope) =>
           val msg = if (envelope.data == DeletedData) Deleted(key) else 
Changed(key)(envelope.data)
           subs.foreach { _ ! msg }
         case None =>
+          if (sendExpiredIfMissing) {
+            val msg = Expired(key)
+            subs.foreach {
+              _ ! msg
+            }
+          }
       }
     }
 
     if (subscribers.nonEmpty) {
       for (key <- changed; if subscribers.contains(key); subs <- 
subscribers.get(key))
-        notify(key, subs)
+        notify(key, subs, sendExpiredIfMissing = true)
     }
 
     // Changed event is sent to new subscribers even though the key has not 
changed,
-    // i.e. send current value
+    // i.e. send current value. Expired is not sent to new subscribers as the 
first event.
     if (newSubscribers.nonEmpty) {
       for ((key, subs) <- newSubscribers) {
-        notify(key, subs)
+        notify(key, subs, sendExpiredIfMissing = false)
         subs.foreach { subscribers.addBinding(key, _) }
       }
       newSubscribers.clear()
@@ -1953,6 +2104,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     }
 
   def receiveGossipTick(): Unit = {
+    cleanupExpired()
     if (fullStateGossipEnabled)
       selectRandomNode(allNodes.toVector).foreach(gossipTo)
   }
@@ -1961,12 +2113,9 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     val to = replica(address)
     val toSystemUid = Some(address.longUid)
     if (dataEntries.size <= maxDeltaElements) {
-      val status = Status(
-        dataEntries.map { case (key, (_, _)) => (key, getDigest(key)) },
-        chunk = 0,
-        totChunks = 1,
-        toSystemUid,
-        selfFromSystemUid)
+      val status = Status(dataEntries.map {
+          case (key, (_, _, usedTimestamp)) => (key, (getDigest(key), 
usedTimestamp))
+        }, chunk = 0, totChunks = 1, toSystemUid, selfFromSystemUid)
       to ! status
     } else {
       val totChunks = dataEntries.size / maxDeltaElements
@@ -1981,7 +2130,8 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
         }
         val chunk = (statusCount % totChunks).toInt
         val status = Status(dataEntries.collect {
-            case (key, (_, _)) if math.abs(key.hashCode % totChunks) == chunk 
=> (key, getDigest(key))
+            case (key, (_, _, usedTimestamp)) if math.abs(key.hashCode % 
totChunks) == chunk =>
+              (key, (getDigest(key), usedTimestamp))
           }, chunk, totChunks, toSystemUid, selfFromSystemUid)
         to ! status
         i += 1
@@ -1995,7 +2145,11 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   def replica(node: UniqueAddress): ActorSelection =
     context.actorSelection(self.path.toStringWithAddress(node.address))
 
-  def receiveStatus(otherDigests: Map[KeyId, Digest], chunk: Int, totChunks: 
Int, fromSystemUid: Option[Long]): Unit = {
+  def receiveStatus(
+      otherDigests: Map[KeyId, (Digest, Timestamp)],
+      chunk: Int,
+      totChunks: Int,
+      fromSystemUid: Option[Long]): Unit = {
     if (log.isDebugEnabled)
       log.debug(
         "Received gossip status from [{}], chunk [{}] of [{}] containing 
[{}].",
@@ -2004,18 +2158,32 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
         totChunks,
         otherDigests.keys.mkString(", "))
 
+    // update the usedTimestamp when needed
+    if (expiryEnabled) {
+      otherDigests.foreach {
+        case (key, (_, usedTimestamp)) =>
+          updateUsedTimestamp(key, usedTimestamp)
+        // if we don't have the key it will be updated with the full Gossip
+      }
+    }
+
     def isOtherDifferent(key: KeyId, otherDigest: Digest): Boolean = {
       val d = getDigest(key)
       d != NotFoundDigest && d != otherDigest
     }
     val otherDifferentKeys = otherDigests.collect {
-      case (key, otherDigest) if isOtherDifferent(key, otherDigest) => key
+      case (key, (otherDigest, _)) if isOtherDifferent(key, otherDigest) => key
     }
     val otherKeys = otherDigests.keySet
     val myKeys =
       if (totChunks == 1) dataEntries.keySet
       else dataEntries.keysIterator.filter(key => math.abs(key.hashCode % 
totChunks) == chunk).toSet
-    val otherMissingKeys = myKeys.diff(otherKeys)
+    val otherMissingKeys =
+      if (expiryEnabled) {
+        val now = System.currentTimeMillis()
+        myKeys.diff(otherKeys).filterNot(key => isExpired(key, 
getUsedTimestamp(key), now))
+      } else
+        myKeys.diff(otherKeys)
     val keys = (otherDifferentKeys ++ otherMissingKeys).take(maxDeltaElements)
     if (keys.nonEmpty) {
       if (log.isDebugEnabled)
@@ -2033,7 +2201,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
           replyTo.path.address,
           myMissingKeys.mkString(", "))
       val status = Status(
-        myMissingKeys.iterator.map(k => k -> NotFoundDigest).toMap,
+        myMissingKeys.iterator.map(k => k -> (NotFoundDigest -> 0L)).toMap,
         chunk,
         totChunks,
         fromSystemUid,
@@ -2051,7 +2219,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     val maxMessageSize = payloadSizeAggregator.maxFrameSize - 128
 
     var messages = Vector.empty[Gossip]
-    val collectedEntries = Vector.newBuilder[(KeyId, DataEnvelope)]
+    val collectedEntries = Vector.newBuilder[(KeyId, (DataEnvelope, 
Timestamp))]
     var sum = 0
 
     def addGossip(): Unit = {
@@ -2074,12 +2242,12 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
 
       val entrySize = keySize + dataSize + envelopeSize
       if (sum + entrySize <= maxMessageSize) {
-        collectedEntries += (key -> dataEnvelope)
+        collectedEntries += (key -> (dataEnvelope -> getUsedTimestamp(key)))
         sum += entrySize
       } else {
         addGossip()
         collectedEntries.clear()
-        collectedEntries += (key -> dataEnvelope)
+        collectedEntries += (key -> (dataEnvelope -> getUsedTimestamp(key)))
         sum = entrySize
       }
     }
@@ -2092,19 +2260,27 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     messages
   }
 
-  def receiveGossip(updatedData: Map[KeyId, DataEnvelope], sendBack: Boolean, 
fromSystemUid: Option[Long]): Unit = {
+  def receiveGossip(
+      updatedData: Map[KeyId, (DataEnvelope, Timestamp)],
+      sendBack: Boolean,
+      fromSystemUid: Option[Long]): Unit = {
     if (log.isDebugEnabled)
       log.debug("Received gossip from [{}], containing [{}].", 
replyTo.path.address, updatedData.keys.mkString(", "))
     var replyKeys = Set.empty[KeyId]
+    val now = if (expiryEnabled) System.currentTimeMillis() else 0L
     updatedData.foreach {
-      case (key, envelope) =>
-        val hadData = dataEntries.contains(key)
-        writeAndStore(key, envelope, reply = false)
-        if (sendBack) getData(key) match {
-          case Some(d) =>
-            if (hadData || d.pruning.nonEmpty)
-              replyKeys += key
-          case None =>
+      case (key, (envelope, usedTimestamp)) =>
+        if (!isExpired(key, usedTimestamp, now)) {
+          val hadData = dataEntries.contains(key)
+          writeAndStore(key, envelope, reply = false)
+          updateUsedTimestamp(key, usedTimestamp)
+
+          if (sendBack) getData(key) match {
+            case Some(d) =>
+              if (hadData || d.pruning.nonEmpty)
+                replyKeys += key
+            case None =>
+          }
         }
     }
     if (sendBack && replyKeys.nonEmpty) {
@@ -2121,6 +2297,39 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     context.watch(subscriber)
   }
 
+  private def cleanupExpired(): Unit = {
+    if (expiryEnabled) {
+      val now = System.currentTimeMillis()
+      // no need to be more accurate than the gossip tick interval
+      currentUsedTimestamp = now
+      val expiredKeys = dataEntries.collect {
+        // it can be 0L when it was set via a DeltaPropagation or Write first 
time, don't expire such immediately
+        case (key, (_, _, usedTimestamp))
+            if usedTimestamp != 0L &&
+            getExpiryDuration(key) != Duration.Zero &&
+            usedTimestamp <= now - getExpiryDuration(key).toMillis =>
+          key
+      }
+
+      if (expiredKeys.nonEmpty) {
+        if (log.isDebugEnabled)
+          log.debug("Removing expired keys [{}]", expiredKeys.mkString(", "))
+
+        val durableExpiredKeys = expiredKeys.filter(isDurable).toSet
+        if (durableExpiredKeys.nonEmpty)
+          durableStore ! DurableStore.Expire(durableExpiredKeys)
+
+        expiredKeys.foreach { key =>
+          deltaPropagationSelector.delete(key)
+          payloadSizeAggregator.remove(key)
+          changed += key // notify subscribers, later
+        }
+
+        dataEntries = dataEntries -- expiredKeys
+      }
+    }
+  }
+
   def receiveUnsubscribe(key: KeyR, subscriber: ActorRef): Unit = {
     subscribers.removeBinding(key.id, subscriber)
     newSubscribers.removeBinding(key.id, subscriber)
@@ -2236,7 +2445,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     val knownNodes = allNodes.union(removedNodes.keySet)
     val newRemovedNodes =
       dataEntries.foldLeft(Set.empty[UniqueAddress]) {
-        case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _))) =>
+        case (acc, (_, (DataEnvelope(data: RemovedNodePruning, _, _), _, _))) 
=>
           acc.union(data.modifiedByNodes.filterNot(n => n == selfUniqueAddress 
|| knownNodes(n)))
         case (acc, _) =>
           acc
@@ -2257,7 +2466,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
       .to(immutable.Set)
 
     if (removedSet.nonEmpty) {
-      for ((key, (envelope, _)) <- dataEntries; removed <- removedSet) {
+      for ((key, (envelope, _, _)) <- dataEntries; removed <- removedSet) {
 
         def init(): Unit = {
           val newEnvelope = envelope.initRemovedNodePruning(removed, 
selfUniqueAddress)
@@ -2286,7 +2495,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
     val pruningPerformed = PruningPerformed(System.currentTimeMillis() + 
pruningMarkerTimeToLive.toMillis)
     val durablePruningPerformed = PruningPerformed(System.currentTimeMillis() 
+ durablePruningMarkerTimeToLive.toMillis)
     dataEntries.foreach {
-      case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning, 
_), _)) =>
+      case (key, (envelope @ DataEnvelope(data: RemovedNodePruning, pruning, 
_), _, _)) =>
         pruning.foreach {
           case (removed, PruningInitialized(owner, seen))
               if owner == selfUniqueAddress
@@ -2305,7 +2514,7 @@ final class Replicator(settings: ReplicatorSettings) 
extends Actor with ActorLog
   def deleteObsoletePruningPerformed(): Unit = {
     val currentTime = System.currentTimeMillis()
     dataEntries.foreach {
-      case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning, _), 
_)) =>
+      case (key, (envelope @ DataEnvelope(_: RemovedNodePruning, pruning, _), 
_, _)) =>
         val newEnvelope = pruning.foldLeft(envelope) {
           case (acc, (removed, p: PruningPerformed)) if 
p.isObsolete(currentTime) =>
             log.debug("Removing obsolete pruning marker for [{}] in [{}]", 
removed, key)
diff --git 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
index 9af89a47f8..bb45747aca 100644
--- 
a/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
+++ 
b/distributed-data/src/main/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializer.scala
@@ -274,12 +274,15 @@ class ReplicatorMessageSerializer(val system: 
ExtendedActorSystem)
     val b = dm.Status.newBuilder()
     b.setChunk(status.chunk).setTotChunks(status.totChunks)
     status.digests.foreach {
-      case (key, digest) =>
-        b.addEntries(
+      case (key, (digest, usedTimestamp)) =>
+        val entryBuilder =
           dm.Status.Entry
             .newBuilder()
             .setKey(key)
-            
.setDigest(ByteStringUtils.toProtoByteStringUnsafe(digest.toArrayUnsafe())))
+            
.setDigest(ByteStringUtils.toProtoByteStringUnsafe(digest.toArrayUnsafe()))
+        if (usedTimestamp != 0L)
+          entryBuilder.setUsedTimestamp(usedTimestamp)
+        b.addEntries(entryBuilder)
     }
     status.toSystemUid.foreach(b.setToSystemUid) // can be None when sending 
back to a node of version 2.5.21
     b.setFromSystemUid(status.fromSystemUid.get)
@@ -292,7 +295,7 @@ class ReplicatorMessageSerializer(val system: 
ExtendedActorSystem)
     val fromSystemUid = if (status.hasFromSystemUid) 
Some(status.getFromSystemUid) else None
     Status(
       status.getEntriesList.asScala.iterator
-        .map(e => e.getKey -> 
PekkoByteString.fromArrayUnsafe(e.getDigest.toByteArray()))
+        .map(e => e.getKey -> 
(PekkoByteString.fromArrayUnsafe(e.getDigest.toByteArray()) -> 
e.getUsedTimestamp))
         .toMap,
       status.getChunk,
       status.getTotChunks,
@@ -303,8 +306,12 @@ class ReplicatorMessageSerializer(val system: 
ExtendedActorSystem)
   private def gossipToProto(gossip: Gossip): dm.Gossip = {
     val b = dm.Gossip.newBuilder().setSendBack(gossip.sendBack)
     gossip.updatedData.foreach {
-      case (key, data) =>
-        
b.addEntries(dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data)))
+      case (key, (data, usedTimestamp)) =>
+        val entryBuilder =
+          
dm.Gossip.Entry.newBuilder().setKey(key).setEnvelope(dataEnvelopeToProto(data))
+        if (usedTimestamp != 0L)
+          entryBuilder.setUsedTimestamp(usedTimestamp)
+        b.addEntries(entryBuilder)
     }
     gossip.toSystemUid.foreach(b.setToSystemUid) // can be None when sending 
back to a node of version 2.5.21
     b.setFromSystemUid(gossip.fromSystemUid.get)
@@ -316,7 +323,9 @@ class ReplicatorMessageSerializer(val system: 
ExtendedActorSystem)
     val toSystemUid = if (gossip.hasToSystemUid) Some(gossip.getToSystemUid) 
else None
     val fromSystemUid = if (gossip.hasFromSystemUid) 
Some(gossip.getFromSystemUid) else None
     Gossip(
-      gossip.getEntriesList.asScala.iterator.map(e => e.getKey -> 
dataEnvelopeFromProto(e.getEnvelope)).toMap,
+      gossip.getEntriesList.asScala.iterator
+        .map(e => e.getKey -> (dataEnvelopeFromProto(e.getEnvelope) -> 
e.getUsedTimestamp))
+        .toMap,
       sendBack = gossip.getSendBack,
       toSystemUid,
       fromSystemUid)
diff --git 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
index 17b02d371f..7ce2daa36a 100644
--- 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
+++ 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/ReplicatorSettingsSpec.scala
@@ -41,5 +41,21 @@ class ReplicatorSettingsSpec
     "have the prefixed replicator name" in {
       ReplicatorSettings.name(system, Some("other")) should 
===("otherDdataReplicator")
     }
+    "parse expire-keys-after-inactivity config" in {
+      val config = ConfigFactory.parseString("""
+        pekko.cluster.distributed-data.expire-keys-after-inactivity {
+          "key-1" = 10 minutes
+          "cache-*" = 2 minutes
+        }
+        
""").withFallback(ReplicatorSettingsSpec.config).withFallback(system.settings.config)
+      val settings = 
ReplicatorSettings(config.getConfig("pekko.cluster.distributed-data"))
+      settings.expiryKeys should have size 2
+      settings.expiryKeys("key-1").toMinutes should ===(10L)
+      settings.expiryKeys("cache-*").toMinutes should ===(2L)
+    }
+    "have empty expiry keys by default" in {
+      val settings = ReplicatorSettings(system)
+      settings.expiryKeys should be(empty)
+    }
   }
 }
diff --git 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
index a54adb163d..9a5648a414 100644
--- 
a/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
+++ 
b/distributed-data/src/test/scala/org/apache/pekko/cluster/ddata/protobuf/ReplicatorMessageSerializerSpec.scala
@@ -123,30 +123,43 @@ class ReplicatorMessageSerializerSpec
       checkSerialization(ReadResult(None))
       checkSerialization(
         Status(
-          Map("A" -> ByteString.fromString("a"), "B" -> 
ByteString.fromString("b")),
+          Map("A" -> (ByteString.fromString("a") -> 0L), "B" -> 
(ByteString.fromString("b") -> 0L)),
           chunk = 3,
           totChunks = 10,
           Some(17),
           Some(19)))
       checkSerialization(
         Status(
-          Map("A" -> ByteString.fromString("a"), "B" -> 
ByteString.fromString("b")),
+          Map("A" -> (ByteString.fromString("a") -> 0L), "B" -> 
(ByteString.fromString("b") -> 0L)),
           chunk = 3,
           totChunks = 10,
           None, // can be None when sending back to a node of version 2.5.21
           Some(19)))
+      checkSerialization(
+        Status(
+          Map("A" -> (ByteString.fromString("a") -> 12345L), "B" -> 
(ByteString.fromString("b") -> 67890L)),
+          chunk = 3,
+          totChunks = 10,
+          Some(17),
+          Some(19)))
       checkSerialization(
         Gossip(
-          Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + 
"c")),
+          Map("A" -> (DataEnvelope(data1) -> 0L), "B" -> (DataEnvelope(GSet() 
+ "b" + "c") -> 0L)),
           sendBack = true,
           Some(17),
           Some(19)))
       checkSerialization(
         Gossip(
-          Map("A" -> DataEnvelope(data1), "B" -> DataEnvelope(GSet() + "b" + 
"c")),
+          Map("A" -> (DataEnvelope(data1) -> 0L), "B" -> (DataEnvelope(GSet() 
+ "b" + "c") -> 0L)),
           sendBack = true,
           None, // can be None when sending back to a node of version 2.5.21
           Some(19)))
+      checkSerialization(
+        Gossip(
+          Map("A" -> (DataEnvelope(data1) -> 12345L), "B" -> 
(DataEnvelope(GSet() + "b" + "c") -> 67890L)),
+          sendBack = true,
+          Some(17),
+          Some(19)))
       checkSerialization(
         DeltaPropagation(
           address1,


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to