This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko.git


The following commit(s) were added to refs/heads/1.2.x by this push:
     new f97bdbd21e Add `JournalPersistFailed` and `JournalPersistRejected` 
signals (#1961) (#2032)
f97bdbd21e is described below

commit f97bdbd21e402ab0a3b667cff7813c7647b28a17
Author: PJ Fanning <[email protected]>
AuthorDate: Sat Aug 23 10:26:22 2025 +0100

    Add `JournalPersistFailed` and `JournalPersistRejected` signals (#1961) 
(#2032)
    
    * Add `JournalPersistFailed` and `JournalPersistRejected` signals with 
debug logging when emitted
    
    * Add tests for `JournalPersistFailed` and `JournalPersistRejected` signals
    
    * fix comments, remove pekko version
    
    * applyCodeStyle
    
    * change license
    
    Co-authored-by: Ilya Kachalsky <[email protected]>
---
 .../scaladsl/EventSourcedBehaviorSignalSpec.scala  | 213 +++++++++++++++++++++
 .../persistence/typed/EventSourcedSignal.scala     |  16 ++
 .../pekko/persistence/typed/internal/Running.scala |  10 +
 3 files changed, 239 insertions(+)

diff --git 
a/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSignalSpec.scala
 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSignalSpec.scala
new file mode 100644
index 0000000000..d5176e5fd0
--- /dev/null
+++ 
b/persistence-typed-tests/src/test/scala/org/apache/pekko/persistence/typed/scaladsl/EventSourcedBehaviorSignalSpec.scala
@@ -0,0 +1,213 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.pekko.persistence.typed.scaladsl
+
+import org.apache.pekko
+import pekko.actor.testkit.typed.TestException
+import pekko.actor.testkit.typed.scaladsl._
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.persistence.AtomicWrite
+import pekko.persistence.journal.inmem.InmemJournal
+import pekko.persistence.typed.JournalPersistFailed
+import pekko.persistence.typed.JournalPersistRejected
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.RecoveryCompleted
+import pekko.serialization.jackson.CborSerializable
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.wordspec.AnyWordSpecLike
+
+import scala.collection.immutable
+import scala.concurrent.Future
+import scala.concurrent.duration._
+import scala.util.Try
+
+// Custom journal that checks event flags to determine whether to reject or 
fail writes
+class SignalTestJournal extends InmemJournal {
+  override def asyncWriteMessages(messages: immutable.Seq[AtomicWrite]): 
Future[immutable.Seq[Try[Unit]]] = {
+    // Check if any of the events have the shouldReject or shouldFail flag set
+    val shouldReject = messages.exists { atomicWrite =>
+      atomicWrite.payload.exists { persistentRepr =>
+        persistentRepr.payload match {
+          case event: EventSourcedBehaviorSignalSpec.Incremented => 
event.shouldReject
+          case _                                                 => false
+        }
+      }
+    }
+
+    val shouldFail = messages.exists { atomicWrite =>
+      atomicWrite.payload.exists { persistentRepr =>
+        persistentRepr.payload match {
+          case event: EventSourcedBehaviorSignalSpec.Incremented => 
event.shouldFail
+          case _                                                 => false
+        }
+      }
+    }
+
+    if (shouldReject) {
+      // Return a successful future with a failed Try to simulate rejection
+      Future.successful(messages.map(_ =>
+        Try { throw TestException("Journal rejected the event") }
+      ))
+    } else if (shouldFail) {
+      // Return a failed future to simulate journal failure
+      Future.failed(TestException("Journal failed to persist the event"))
+    } else {
+      super.asyncWriteMessages(messages)
+    }
+  }
+}
+
+object EventSourcedBehaviorSignalSpec {
+  // Commands
+  sealed trait Command extends CborSerializable
+  case object Increment extends Command
+  case object IncrementWithReject extends Command
+  case object IncrementWithFailure extends Command
+
+  // Events
+  sealed trait Event extends CborSerializable
+  final case class Incremented(delta: Int, shouldReject: Boolean = false, 
shouldFail: Boolean = false) extends Event
+
+  // State
+  final case class State(value: Int) extends CborSerializable
+
+  // Configuration for tests
+  val config: Config = ConfigFactory.parseString(s"""
+    pekko.loglevel = INFO
+    pekko.persistence.journal.plugin = "signal-test-journal"
+    signal-test-journal = $${pekko.persistence.journal.inmem}
+    signal-test-journal {
+      class = "${classOf[SignalTestJournal].getName}"
+    }
+    """).withFallback(ConfigFactory.defaultReference()).resolve()
+
+  // Create a behavior that tracks signals
+  def signalTrackingBehavior(
+      persistenceId: PersistenceId,
+      signalProbe: ActorRef[String]): Behavior[Command] = {
+
+    // Create the base EventSourcedBehavior
+    val eventSourcedBehavior = EventSourcedBehavior[Command, Event, State](
+      persistenceId,
+      emptyState = State(0),
+      commandHandler = (_, cmd) =>
+        cmd match {
+          case Increment =>
+            Effect.persist(Incremented(1))
+          case IncrementWithReject =>
+            // Create an event that signals it should be rejected
+            Effect.persist(Incremented(1, shouldReject = true))
+          case IncrementWithFailure =>
+            // Create an event that signals it should fail
+            Effect.persist(Incremented(1, shouldFail = true))
+        },
+      eventHandler = (state, evt) =>
+        evt match {
+          case Incremented(delta, _, _) =>
+            State(state.value + delta)
+        })
+      .receiveSignal {
+        case (_, RecoveryCompleted) =>
+          signalProbe ! "RecoveryCompleted"
+        case (_, signal: JournalPersistRejected) =>
+          val message = s"JournalPersistRejected: ${signal.failure.getMessage}"
+          signalProbe ! message
+        case (_, signal: JournalPersistFailed) =>
+          val message = s"JournalPersistFailed: ${signal.failure.getMessage}"
+          signalProbe ! message
+      }
+
+    // We don't need to handle failures with supervision since we're only 
testing the signals
+    eventSourcedBehavior
+  }
+}
+
+class EventSourcedBehaviorSignalSpec
+    extends ScalaTestWithActorTestKit(EventSourcedBehaviorSignalSpec.config)
+    with AnyWordSpecLike
+    with LogCapturing {
+
+  import EventSourcedBehaviorSignalSpec._
+
+  private val pidCounter = new java.util.concurrent.atomic.AtomicInteger(0)
+  private def nextPid(): PersistenceId = 
PersistenceId.ofUniqueId(s"signal-test-${pidCounter.incrementAndGet()}")
+
+  "An EventSourcedBehavior" must {
+    "receive JournalPersistRejected signal when journal rejects events" in {
+      // Create a probe to track signals
+      val signalProbe = createTestProbe[String]()
+
+      // Create a behavior that will track signals
+      val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
+
+      // Spawn the actor
+      val actor = spawn(behavior)
+
+      // Wait for recovery to complete
+      signalProbe.expectMessage("RecoveryCompleted")
+
+      // Send a command that will trigger a rejection
+      actor ! IncrementWithReject
+
+      // Verify that the JournalPersistRejected signal was received
+      signalProbe.expectMessage(5.seconds, "JournalPersistRejected: Journal 
rejected the event")
+    }
+
+    "receive JournalPersistFailed signal when journal fails to persist events" 
in {
+      // Create a probe to track signals
+      val signalProbe = createTestProbe[String]()
+
+      // Create a behavior that will track signals
+      val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
+
+      // Spawn the actor
+      val actor = spawn(behavior)
+
+      // Wait for recovery to complete
+      signalProbe.expectMessage("RecoveryCompleted")
+
+      // Send a command that will trigger a failure
+      actor ! IncrementWithFailure
+
+      // Verify that the JournalPersistFailed signal was received
+      signalProbe.expectMessage(5.seconds, "JournalPersistFailed: Journal 
failed to persist the event")
+    }
+
+    "receive no signal when journal doesn't fail" in {
+      // Create a probe to track signals
+      val signalProbe = createTestProbe[String]()
+
+      // Create a behavior that will track signals
+      val behavior = signalTrackingBehavior(nextPid(), signalProbe.ref)
+
+      // Spawn the actor
+      val actor = spawn(behavior)
+
+      // Wait for recovery to complete
+      signalProbe.expectMessage("RecoveryCompleted")
+
+      // Send a command that won't trigger a failure
+      actor ! Increment
+
+      // Verify that no signal was emitted
+      signalProbe.expectNoMessage(5.seconds)
+    }
+  }
+}
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/EventSourcedSignal.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/EventSourcedSignal.scala
index f153daaa35..529bdeeb69 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/EventSourcedSignal.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/EventSourcedSignal.scala
@@ -39,6 +39,22 @@ final case class RecoveryFailed(failure: Throwable) extends 
EventSourcedSignal {
   def getFailure(): Throwable = failure
 }
 
+final case class JournalPersistFailed(failure: Throwable) extends 
EventSourcedSignal {
+
+  /**
+   * Java API
+   */
+  def getFailure(): Throwable = failure
+}
+
+final case class JournalPersistRejected(failure: Throwable) extends 
EventSourcedSignal {
+
+  /**
+   * Java API
+   */
+  def getFailure(): Throwable = failure
+}
+
 final case class SnapshotCompleted(metadata: SnapshotMetadata) extends 
EventSourcedSignal {
 
   /**
diff --git 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
index bae282c033..23d3cbc8b9 100644
--- 
a/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
+++ 
b/persistence-typed/src/main/scala/org/apache/pekko/persistence/typed/internal/Running.scala
@@ -52,6 +52,8 @@ import pekko.persistence.typed.{
   DeleteSnapshotsFailed,
   DeletionTarget,
   EventRejectedException,
+  JournalPersistFailed,
+  JournalPersistRejected,
   PersistenceId,
   SnapshotCompleted,
   SnapshotFailed,
@@ -816,12 +818,20 @@ private[pekko] object Running {
         case WriteMessageRejected(p, cause, id) =>
           if (id == setup.writerIdentity.instanceId) {
             onWriteRejected(setup.context, cause, p)
+            val signal = JournalPersistRejected(cause)
+            if (setup.onSignal(state.state, signal, catchAndLog = false)) {
+              setup.internalLogger.debug("Emitted signal [{}].", signal)
+            }
             throw new EventRejectedException(setup.persistenceId, 
p.sequenceNr, cause)
           } else this
 
         case WriteMessageFailure(p, cause, id) =>
           if (id == setup.writerIdentity.instanceId) {
             onWriteFailed(setup.context, cause, p)
+            val signal = JournalPersistFailed(cause)
+            if (setup.onSignal(state.state, signal, catchAndLog = false)) {
+              setup.internalLogger.debug("Emitted signal [{}].", signal)
+            }
             throw new JournalFailureException(setup.persistenceId, 
p.sequenceNr, p.payload.getClass.getName, cause)
           } else this
 


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to