This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 4b19e82  Port CanTriggerReplay and ScalaBySlicesSourceProviderAdapter 
from akka-projection 1.4.x (#476)
4b19e82 is described below

commit 4b19e82790064bd278dad11e05a299703a7157bc
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 15 14:34:37 2026 +0100

    Port CanTriggerReplay and ScalaBySlicesSourceProviderAdapter from 
akka-projection 1.4.x (#476)
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/5ed34547-44f6-4f51-ad38-2ce0babfb1be
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .../projection/internal/CanTriggerReplay.scala     | 24 ++++++++++++++++++++
 .../internal/SourceProviderAdapter.scala           | 26 ++++++++++++++++++++++
 .../javadsl/EventSourcedProvider.scala             | 12 +++++++++-
 .../scaladsl/EventSourcedProvider.scala            | 12 +++++++++-
 4 files changed, 72 insertions(+), 2 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
 
b/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
new file mode 100644
index 0000000..5f53c4a
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/CanTriggerReplay.scala
@@ -0,0 +1,24 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.internal
+
+import org.apache.pekko.annotation.InternalApi
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[pekko] trait CanTriggerReplay {
+  private[pekko] def triggerReplay(persistenceId: String, fromSeqNr: Long): 
Unit
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
 
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
index 5535653..91f8170 100644
--- 
a/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
+++ 
b/core/src/main/scala/org/apache/pekko/projection/internal/SourceProviderAdapter.scala
@@ -24,9 +24,11 @@ import scala.jdk.OptionConverters._
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.annotation.InternalApi
+import pekko.projection.BySlicesSourceProvider
 import pekko.projection.javadsl
 import pekko.projection.scaladsl
 import pekko.stream.scaladsl.Source
+import pekko.stream.javadsl.{ Source => JSource }
 
 /**
  * INTERNAL API: Adapter from javadsl.SourceProvider to scaladsl.SourceProvider
@@ -49,3 +51,27 @@ import pekko.stream.scaladsl.Source
 
   def extractCreationTime(envelope: Envelope): Long = 
delegate.extractCreationTime(envelope)
 }
+
+/**
+ * INTERNAL API: Adapter from scaladsl.SourceProvider with 
BySlicesSourceProvider to javadsl.SourceProvider with BySlicesSourceProvider
+ */
+@InternalApi private[projection] class 
ScalaBySlicesSourceProviderAdapter[Offset, Envelope](
+    delegate: scaladsl.SourceProvider[Offset, Envelope] with 
BySlicesSourceProvider)
+    extends javadsl.SourceProvider[Offset, Envelope]
+    with BySlicesSourceProvider {
+  override def source(
+      offset: Supplier[CompletionStage[Optional[Offset]]])
+      : CompletionStage[JSource[Envelope, NotUsed]] =
+    delegate
+      .source(() => 
offset.get().asScala.map(_.toScala)(ExecutionContext.parasitic))
+      .map(_.asJava)(ExecutionContext.parasitic)
+      .asJava
+
+  override def extractOffset(envelope: Envelope): Offset = 
delegate.extractOffset(envelope)
+
+  override def extractCreationTime(envelope: Envelope): Long = 
delegate.extractCreationTime(envelope)
+
+  def minSlice: Int = delegate.minSlice
+
+  def maxSlice: Int = delegate.maxSlice
+}
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
index 77581e4..c3d1dd9 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/javadsl/EventSourcedProvider.scala
@@ -39,6 +39,7 @@ import 
pekko.persistence.query.typed.javadsl.EventsBySliceQuery
 import pekko.persistence.query.typed.javadsl.LoadEventQuery
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.eventsourced.EventEnvelope
+import pekko.projection.internal.CanTriggerReplay
 import pekko.projection.javadsl
 import pekko.projection.javadsl.SourceProvider
 import pekko.stream.javadsl.Source
@@ -127,7 +128,16 @@ object EventSourcedProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-    new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    eventsBySlicesQuery match {
+      case query: EventsBySliceQuery with CanTriggerReplay =>
+        new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, 
entityType, minSlice, maxSlice, system)
+          with CanTriggerReplay {
+          private[pekko] override def triggerReplay(persistenceId: String, 
fromSeqNr: Long): Unit =
+            query.triggerReplay(persistenceId, fromSeqNr)
+        }
+      case _ =>
+        new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    }
   }
 
   def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: 
String, persistenceId: String): Int =
diff --git 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
index 5a45e0f..19aa62a 100644
--- 
a/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
+++ 
b/eventsourced/src/main/scala/org/apache/pekko/projection/eventsourced/scaladsl/EventSourcedProvider.scala
@@ -32,6 +32,7 @@ import 
pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
 import pekko.projection.BySlicesSourceProvider
 import pekko.projection.eventsourced.EventEnvelope
+import pekko.projection.internal.CanTriggerReplay
 import pekko.projection.scaladsl.SourceProvider
 import pekko.stream.scaladsl.Source
 
@@ -113,7 +114,16 @@ object EventSourcedProvider {
       entityType: String,
       minSlice: Int,
       maxSlice: Int): SourceProvider[Offset, 
pekko.persistence.query.typed.EventEnvelope[Event]] = {
-    new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    eventsBySlicesQuery match {
+      case query: EventsBySliceQuery with CanTriggerReplay =>
+        new EventsBySlicesSourceProvider[Event](eventsBySlicesQuery, 
entityType, minSlice, maxSlice, system)
+          with CanTriggerReplay {
+          private[pekko] override def triggerReplay(persistenceId: String, 
fromSeqNr: Long): Unit =
+            query.triggerReplay(persistenceId, fromSeqNr)
+        }
+      case _ =>
+        new EventsBySlicesSourceProvider(eventsBySlicesQuery, entityType, 
minSlice, maxSlice, system)
+    }
   }
 
   def sliceForPersistenceId(system: ActorSystem[_], readJournalPluginId: 
String, persistenceId: String): Int =


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to