This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.2.x by this push:
     new 3b767671c8 Add AsyncWriteJournal option for disabling Resequencer 
(#2026) (#2027) (#2043)
3b767671c8 is described below

commit 3b767671c83639f2fbf227153f81c402cda84251
Author: PJ Fanning <[email protected]>
AuthorDate: Fri Aug 22 08:14:54 2025 +0100

    Add AsyncWriteJournal option for disabling Resequencer (#2026) (#2027) 
(#2043)
    
    * Add AsyncWriteJournal option for disabling Resequencer (#2026)
    
    * Add @InternalApi as requested in review
    
    * Update AsyncWriteJournal.scala
    
    ---------
    
    Co-authored-by: Mikhail Sokolov <[email protected]>
---
 persistence/src/main/resources/reference.conf      |  14 +++
 .../persistence/journal/AsyncWriteJournal.scala    |  28 +++--
 .../AsyncWriteJournalResponseOrderSpec.scala       | 131 +++++++++++++++++++++
 3 files changed, 166 insertions(+), 7 deletions(-)

diff --git a/persistence/src/main/resources/reference.conf 
b/persistence/src/main/resources/reference.conf
index 145257bec0..e3f3694267 100644
--- a/persistence/src/main/resources/reference.conf
+++ b/persistence/src/main/resources/reference.conf
@@ -151,6 +151,20 @@ pekko.persistence {
         # replayed event.
         debug = off
       }
+
+      # Controls whether the journal plugin sends back write responses in the 
same order
+      # as it received requests.
+      #
+      # Originally Akka-Persistence implementation rearranged responses to 
match the request order.
+      # But this feature wasn't guaranteed by the Akka's test suite, and 
nothing in Akka itself relied on it.
+      #
+      # As this ordering is global, slow write requests for some entities can 
stall writes for all,
+      # which can cause latency issues under load.
+      #
+      # The old behaviour is still enabled by default ("on"). After more 
testing on existing applications,
+      # the default might be switched to "off", and eventually this option 
might be removed altogeter, leaving
+      # "off" the only behaviour available.
+      write-response-global-order = on
     }
 
     # Fallback settings for snapshot store plugin configurations
diff --git 
a/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala
 
b/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala
index 9bede9e068..ae27bd5c4f 100644
--- 
a/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala
+++ 
b/persistence/src/main/scala/org/apache/pekko/persistence/journal/AsyncWriteJournal.scala
@@ -22,6 +22,7 @@ import scala.util.control.NonFatal
 
 import org.apache.pekko
 import pekko.actor._
+import pekko.annotation.InternalApi
 import pekko.pattern.CircuitBreaker
 import pekko.pattern.pipe
 import pekko.persistence._
@@ -67,9 +68,22 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase 
with AsyncRecovery {
   final val receiveWriteJournal: Actor.Receive = {
     // cannot be a val in the trait due to binary compatibility
     val replayDebugEnabled: Boolean = config.getBoolean("replay-filter.debug")
+    val enableGlobalWriteResponseOrder: Boolean = 
config.getBoolean("write-response-global-order")
+
     val eventStream = context.system.eventStream // used from Future callbacks
     implicit val ec: ExecutionContext = context.dispatcher
 
+    // should be a private method in the trait, but it needs the 
enableGlobalWriteResponseOrder field which can't be
+    // moved to the trait level because adding any fields there breaks 
bincompat
+    @InternalApi
+    def sendWriteResponse(msg: Any, snr: Long, target: ActorRef, sender: 
ActorRef): Unit = {
+      if (enableGlobalWriteResponseOrder) {
+        resequencer ! Desequenced(msg, snr, target, sender)
+      } else {
+        target.tell(msg, sender)
+      }
+    }
+
     {
       case WriteMessages(messages, persistentActor, actorInstanceId) =>
         val cctr = resequencerCounter
@@ -100,7 +114,7 @@ trait AsyncWriteJournal extends Actor with WriteJournalBase 
with AsyncRecovery {
 
         writeResult.onComplete {
           case Success(results) =>
-            resequencer ! Desequenced(WriteMessagesSuccessful, cctr, 
persistentActor, self)
+            sendWriteResponse(WriteMessagesSuccessful, cctr, persistentActor, 
self)
 
             val resultsIter =
               if (results.isEmpty) 
Iterator.fill(atomicWriteCount)(AsyncWriteJournal.successUnit)
@@ -111,12 +125,12 @@ trait AsyncWriteJournal extends Actor with 
WriteJournalBase with AsyncRecovery {
                 resultsIter.next() match {
                   case Success(_) =>
                     a.payload.foreach { p =>
-                      resequencer ! Desequenced(WriteMessageSuccess(p, 
actorInstanceId), n, persistentActor, p.sender)
+                      sendWriteResponse(WriteMessageSuccess(p, 
actorInstanceId), n, persistentActor, p.sender)
                       n += 1
                     }
                   case Failure(e) =>
                     a.payload.foreach { p =>
-                      resequencer ! Desequenced(
+                      sendWriteResponse(
                         WriteMessageRejected(p, e, actorInstanceId),
                         n,
                         persistentActor,
@@ -126,21 +140,21 @@ trait AsyncWriteJournal extends Actor with 
WriteJournalBase with AsyncRecovery {
                 }
 
               case r: NonPersistentRepr =>
-                resequencer ! Desequenced(LoopMessageSuccess(r.payload, 
actorInstanceId), n, persistentActor, r.sender)
+                sendWriteResponse(LoopMessageSuccess(r.payload, 
actorInstanceId), n, persistentActor, r.sender)
                 n += 1
             }
 
           case Failure(e) =>
-            resequencer ! Desequenced(WriteMessagesFailed(e, 
atomicWriteCount), cctr, persistentActor, self)
+            sendWriteResponse(WriteMessagesFailed(e, atomicWriteCount), cctr, 
persistentActor, self)
             var n = cctr + 1
             messages.foreach {
               case a: AtomicWrite =>
                 a.payload.foreach { p =>
-                  resequencer ! Desequenced(WriteMessageFailure(p, e, 
actorInstanceId), n, persistentActor, p.sender)
+                  sendWriteResponse(WriteMessageFailure(p, e, 
actorInstanceId), n, persistentActor, p.sender)
                   n += 1
                 }
               case r: NonPersistentRepr =>
-                resequencer ! Desequenced(LoopMessageSuccess(r.payload, 
actorInstanceId), n, persistentActor, r.sender)
+                sendWriteResponse(LoopMessageSuccess(r.payload, 
actorInstanceId), n, persistentActor, r.sender)
                 n += 1
             }
         }
diff --git 
a/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
new file mode 100644
index 0000000000..a0faa633c0
--- /dev/null
+++ 
b/persistence/src/test/scala/org/apache/pekko/persistence/journal/AsyncWriteJournalResponseOrderSpec.scala
@@ -0,0 +1,131 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.journal
+
+import org.apache.pekko.persistence.{ AtomicWrite, JournalProtocol, 
PersistenceSpec, PersistentRepr }
+import org.apache.pekko.testkit.ImplicitSender
+
+import scala.collection.immutable
+import scala.concurrent.{ ExecutionContext, Future, Promise }
+import scala.util.Try
+
+/**
+ * Verifies write response ordering logic for [[AsyncWriteJournal]].
+ *
+ * Checkout write-response-global-order config option for more information.
+ */
+class AsyncWriteJournalResponseOrderSpec
+    extends PersistenceSpec(
+      PersistenceSpec.config(
+        plugin = "", // we will provide explicit plugin IDs later
+        test = classOf[AsyncWriteJournalResponseOrderSpec].getSimpleName,
+        extraConfig = Some(
+          s"""
+          |pekko.persistence.journal.reverse-plugin {
+          |  with-global-order {
+          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
+          |    
+          |    write-response-global-order = on
+          |  }
+          |  no-global-order {
+          |    class = 
"${classOf[AsyncWriteJournalResponseOrderSpec.ReversePlugin].getName}"
+          |    
+          |    write-response-global-order = off
+          |  }
+          |}
+          |""".stripMargin
+        ))) with ImplicitSender {
+
+  import AsyncWriteJournalResponseOrderSpec._
+
+  "AsyncWriteJournal" must {
+    "return write responses in request order if global response order is 
enabled" in {
+      val pluginRef =
+        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.with-global-order")
+
+      pluginRef ! mkWriteMessages(1)
+      pluginRef ! mkWriteMessages(2)
+      pluginRef ! mkWriteMessages(3)
+
+      pluginRef ! CompleteWriteOps
+
+      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(1, 2, 3)
+    }
+
+    "return write responses in completion order if global response order is 
disabled" in {
+      val pluginRef =
+        extension.journalFor(journalPluginId = 
"pekko.persistence.journal.reverse-plugin.no-global-order")
+
+      pluginRef ! mkWriteMessages(1)
+      pluginRef ! mkWriteMessages(2)
+      pluginRef ! mkWriteMessages(3)
+
+      pluginRef ! CompleteWriteOps
+
+      getMessageNumsFromResponses(receiveN(6)) shouldEqual Vector(3, 2, 1)
+    }
+  }
+
+  private def mkWriteMessages(num: Int): JournalProtocol.WriteMessages = 
JournalProtocol.WriteMessages(
+    messages = Vector(AtomicWrite(PersistentRepr(
+      payload = num,
+      sequenceNr = 0L,
+      persistenceId = num.toString
+    ))),
+    persistentActor = self,
+    actorInstanceId = 1
+  )
+
+  private def getMessageNumsFromResponses(responses: Seq[AnyRef]): Vector[Int] 
= responses.collect {
+    case successResponse: JournalProtocol.WriteMessageSuccess =>
+      successResponse.persistent.payload.asInstanceOf[Int]
+  }.toVector
+}
+
+private object AsyncWriteJournalResponseOrderSpec {
+  case object CompleteWriteOps
+
+  /**
+   * Accumulates asyncWriteMessages requests and completes them in reverse 
receive order on [[CompleteWriteOps]] command
+   */
+  class ReversePlugin extends AsyncWriteJournal {
+
+    private implicit val ec: ExecutionContext = context.dispatcher
+
+    private var pendingOps: Vector[Promise[Unit]] = Vector.empty
+
+    override def receivePluginInternal: Receive = {
+      case CompleteWriteOps =>
+        pendingOps.reverse.foreach(_.success(()))
+        pendingOps = Vector.empty
+    }
+
+    override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): 
Future[immutable.Seq[Try[Unit]]] = {
+      val responsePromise = Promise[Unit]()
+      pendingOps = pendingOps :+ responsePromise
+      responsePromise.future.map(_ => Vector.empty)
+    }
+
+    override def asyncDeleteMessagesTo(persistenceId: String, toSequenceNr: 
Long): Future[Unit] = ???
+
+    override def asyncReplayMessages(persistenceId: String, fromSequenceNr: 
Long, toSequenceNr: Long, max: Long)(
+        recoveryCallback: PersistentRepr => Unit): Future[Unit] = ???
+
+    override def asyncReadHighestSequenceNr(persistenceId: String, 
fromSequenceNr: Long): Future[Long] = ???
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to