This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new be405ac  add grpc replication tests (#487)
be405ac is described below

commit be405acfae09ed0edb78189c833915176e0bb7c4
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 22 09:31:41 2026 +0100

    add grpc replication tests (#487)
    
    * add grpc replication tests
    
    * Update ReplicationJavaDSLIntegrationSpec.scala
    
    * Update build.sbt
---
 build.sbt                                          |   7 +-
 .../replication/ReplicationIntegrationSpec.scala   | 330 +++++++++++++++++++++
 .../ReplicationJavaDSLIntegrationSpec.scala        | 322 ++++++++++++++++++++
 3 files changed, 656 insertions(+), 3 deletions(-)

diff --git a/build.sbt b/build.sbt
index 07b2651..f5f18cf 100644
--- a/build.sbt
+++ b/build.sbt
@@ -212,9 +212,10 @@ lazy val grpcIntTest =
     .settings(
       name := "pekko-projection-grpc-int-test",
       publish / skip := true,
-      Test / parallelExecution := false,
-      // we need to access snapshot jars for pekko-persistence-r2dbc
-      resolvers += Resolver.ApacheMavenSnapshotsRepo)
+      // following is needed by Agrona lib
+      // https://github.com/aeron-io/agrona/wiki/Change-Log#200-2024-12-17
+      Test / fork := true,
+      Test / javaOptions += 
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
     .dependsOn(grpcTest % "test->test;test->compile")
     .dependsOn(eventsourced % Test)
     .dependsOn(r2dbc % Test)
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationIntegrationSpec.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationIntegrationSpec.scala
new file mode 100644
index 0000000..bbc2a32
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationIntegrationSpec.scala
@@ -0,0 +1,330 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.replication
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.ActorTestKit
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
+import pekko.cluster.MemberStatus
+import pekko.cluster.sharding.typed.scaladsl.ClusterSharding
+import pekko.cluster.typed.Cluster
+import pekko.cluster.typed.Join
+import pekko.grpc.GrpcClientSettings
+import pekko.http.scaladsl.Http
+import pekko.persistence.typed.ReplicaId
+import pekko.persistence.typed.crdt.LwwTime
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.replication
+import pekko.projection.grpc.replication.scaladsl.Replica
+import pekko.projection.grpc.replication.scaladsl.ReplicatedBehaviors
+import pekko.projection.grpc.replication.scaladsl.Replication
+import pekko.projection.grpc.replication.scaladsl.ReplicationSettings
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.scaladsl.R2dbcReplication
+import pekko.testkit.SocketUtil
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+
+object ReplicationIntegrationSpec {
+
+  private def config(dc: ReplicaId): Config =
+    ConfigFactory.parseString(s"""
+       pekko.actor.provider = cluster
+       pekko.actor {
+         serialization-bindings {
+           
"${classOf[replication.ReplicationIntegrationSpec].getName}$$LWWHelloWorld$$Event"
 = jackson-json
+         }
+       }
+       pekko.http.server.enable-http2 = on
+       pekko.persistence.r2dbc {
+          query {
+            refresh-interval = 500 millis
+            # reducing this to have quicker test, triggers backtracking earlier
+            backtracking.behind-current-time = 3 seconds
+          }
+        }
+        pekko.projection.grpc {
+          producer {
+            query-plugin-id = "pekko.persistence.r2dbc.query"
+          }
+        }
+        pekko.projection.r2dbc.offset-store {
+          timestamp-offset-table = "projection_timestamp_offset_store_${dc.id}"
+        }
+        pekko.remote.artery.canonical.host = "127.0.0.1"
+        pekko.remote.artery.canonical.port = 0
+        pekko.actor.testkit.typed {
+          filter-leeway = 10s
+          system-shutdown-default = 30s
+        }
+      """)
+
+  private val DCA = ReplicaId("DCA")
+  private val DCB = ReplicaId("DCB")
+  private val DCC = ReplicaId("DCC")
+
+  object LWWHelloWorld {
+
+    sealed trait Command
+
+    case class Get(replyTo: ActorRef[String]) extends Command
+
+    case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) 
extends Command
+
+    sealed trait Event
+
+    case class GreetingChanged(greeting: String, timestamp: LwwTime) extends 
Event
+
+    object State {
+      val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId("")))
+    }
+
+    case class State(greeting: String, timestamp: LwwTime)
+
+    def apply(replicatedBehaviors: ReplicatedBehaviors[Command, Event, State]) 
=
+      replicatedBehaviors.setup { replicationContext =>
+        EventSourcedBehavior[Command, Event, State](
+          replicationContext.persistenceId,
+          State.initial,
+          {
+            case (State(greeting, _), Get(replyTo)) =>
+              replyTo ! greeting
+              Effect.none
+            case (state, SetGreeting(greeting, replyTo)) =>
+              Effect
+                .persist(
+                  GreetingChanged(
+                    greeting,
+                    
state.timestamp.increase(replicationContext.currentTimeMillis(), 
replicationContext.replicaId)))
+                .thenRun((_: State) => replyTo ! Done)
+          },
+          {
+            case (currentState, GreetingChanged(newGreeting, newTimestamp)) =>
+              if (newTimestamp.isAfter(currentState.timestamp))
+                State(newGreeting, newTimestamp)
+              else currentState
+          })
+      }
+  }
+}
+
+class ReplicationIntegrationSpec(testContainerConf: TestContainerConf)
+    extends ScalaTestWithActorTestKit(
+      pekko.actor.ActorSystem(
+        "ReplicationIntegrationSpecA",
+        ReplicationIntegrationSpec
+          .config(ReplicationIntegrationSpec.DCA)
+          .withFallback(testContainerConf.config))
+        .toTyped)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with BeforeAndAfterAll
+    with LogCapturing {
+  import ReplicationIntegrationSpec._
+  implicit val ec: ExecutionContext = system.executionContext
+
+  def this() = this(new TestContainerConf)
+
+  private val logger = 
LoggerFactory.getLogger(classOf[ReplicationIntegrationSpec])
+  override def typedSystem: ActorSystem[_] = testKit.system
+
+  private val systems = Seq[ActorSystem[_]](
+    typedSystem,
+    pekko.actor.ActorSystem(
+      "ReplicationIntegrationSpecB",
+      
ReplicationIntegrationSpec.config(DCB).withFallback(testContainerConf.config))
+      .toTyped,
+    pekko.actor.ActorSystem(
+      "ReplicationIntegrationSpecC",
+      
ReplicationIntegrationSpec.config(DCC).withFallback(testContainerConf.config))
+      .toTyped)
+
+  private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, 
"127.0.0.1").map(_.getPort)
+  private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts)
+  private val allReplicas = allDcsAndPorts.map {
+    case (id, port) =>
+      Replica(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", 
port).withTls(false))
+  }.toSet
+
+  private val testKitsPerDc = Map(DCA -> testKit, DCB -> 
ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2)))
+  private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> 
systems(2))
+  private var replicatedEventSourcingOverGrpcPerDc: Map[ReplicaId, 
Replication[LWWHelloWorld.Command]] = Map.empty
+  private val entityIds = Set("one", "two", "three")
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    // We can share the journal to save a bit of work, because the persistence 
id contains
+    // the dc so is unique (this is ofc completely synthetic, the whole point 
of replication
+    // over grpc is to replicate between different dcs/regions with completely 
separate databases).
+    // The offset tables need to be separate though to not get conflicts on 
projection names
+    systemPerDc.values.foreach { system =>
+      val r2dbcProjectionSettings = R2dbcProjectionSettings(system)
+      Await.result(
+        r2dbcExecutor.updateOne("beforeAll delete")(
+          _.createStatement(s"delete from 
${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")),
+        10.seconds)
+
+    }
+  }
+
+  def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): 
Replication[LWWHelloWorld.Command] = {
+    val settings = ReplicationSettings[LWWHelloWorld.Command](
+      "hello-world",
+      selfReplicaId,
+      EventProducerSettings(replicaSystem),
+      allReplicas,
+      10.seconds,
+      8,
+      R2dbcReplication())
+    
Replication.grpcReplication(settings)(ReplicationIntegrationSpec.LWWHelloWorld.apply)(replicaSystem)
+  }
+
+  "Replication over gRPC" should {
+    "form three one node clusters" in {
+      testKitsPerDc.values.foreach { testKit =>
+        val cluster = Cluster(testKit.system)
+        cluster.manager ! Join(cluster.selfMember.address)
+        testKit.createTestProbe().awaitAssert {
+          cluster.selfMember.status should ===(MemberStatus.Up)
+        }
+      }
+    }
+
+    "start three replicas" in {
+      val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map {
+        case (replica, index) =>
+          val system = systems(index)
+          logger
+            .infoN(
+              "Starting replica [{}], system [{}] on port [{}]",
+              replica.replicaId,
+              system.name,
+              replica.grpcClientSettings.defaultPort)
+          val started = startReplica(system, replica.replicaId)
+          val grpcPort = grpcPorts(index)
+
+          // start producer server
+          Http(system)
+            .newServerAt("127.0.0.1", grpcPort)
+            .bind(started.createSingleServiceHandler())
+            
.map(_.addToCoordinatedShutdown(3.seconds)(system))(system.executionContext)
+            .map(_ => replica.replicaId -> started)
+      })
+
+      replicatedEventSourcingOverGrpcPerDc = replicasStarted.futureValue.toMap
+      logger.info("All three replication/producer services bound")
+    }
+
+    "replicate writes from one dc to the other two" in {
+      val entityTypeKey = 
replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey
+      systemPerDc.keys.foreach { dc =>
+        withClue(s"from ${dc.id}") {
+          Future
+            .sequence(entityIds.map { entityId =>
+              logger.infoN("Updating greeting for [{}] from dc [{}]", 
entityId, dc.id)
+              ClusterSharding(systemPerDc(dc))
+                .entityRefFor(entityTypeKey, entityId)
+                .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _))
+            })
+            .futureValue
+
+          testKitsPerDc.values.foreach { testKit =>
+            withClue(s"on ${system.name}") {
+              val probe = testKit.createTestProbe()
+
+              entityIds.foreach { entityId =>
+                withClue(s"for entity id $entityId") {
+                  val entityRef = ClusterSharding(testKit.system)
+                    .entityRefFor(entityTypeKey, entityId)
+
+                  probe.awaitAssert({
+                      entityRef
+                        .ask(LWWHelloWorld.Get.apply)
+                        .futureValue should ===(s"hello 1 from ${dc.id}")
+                    }, 10.seconds)
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+
+    "replicate concurrent writes to the other DCs" in (2 to 4).foreach { 
greetingNo =>
+      withClue(s"Greeting $greetingNo") {
+        val entityTypeKey = 
replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey
+        Future
+          .sequence(systemPerDc.keys.map { dc =>
+            withClue(s"from ${dc.id}") {
+              Future.sequence(entityIds.map { entityId =>
+                logger.infoN("Updating greeting for [{}] from dc [{}]", 
entityId, dc.id)
+                ClusterSharding(systemPerDc(dc))
+                  .entityRefFor(entityTypeKey, entityId)
+                  .ask(LWWHelloWorld.SetGreeting(s"hello $greetingNo from 
${dc.id}", _))
+              })
+            }
+          })
+          .futureValue // all three updated in roughly parallel
+
+        // All 3 should eventually arrive at the same value
+        testKit
+          .createTestProbe()
+          .awaitAssert(
+            {
+              entityIds.foreach { entityId =>
+                withClue(s"for entity id $entityId") {
+                  testKitsPerDc.values.map { testKit =>
+                    val entityRef = ClusterSharding(testKit.system)
+                      .entityRefFor(entityTypeKey, entityId)
+
+                    entityRef
+                      .ask(LWWHelloWorld.Get.apply)
+                      .futureValue
+                  }.toSet should have size 1
+                }
+              }
+            },
+            20.seconds)
+      }
+    }
+  }
+
+  protected override def afterAll(): Unit = {
+    logger.info("Shutting down all three DCs")
+    systems.foreach(_.terminate()) // speed up termination by terminating all 
at the once
+    // and then make sure they are completely shutdown
+    systems.foreach { system =>
+      ActorTestKit.shutdown(system)
+    }
+    super.afterAll()
+  }
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala
new file mode 100644
index 0000000..44a6ce1
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/replication/ReplicationJavaDSLIntegrationSpec.scala
@@ -0,0 +1,322 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.replication
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.ActorTestKit
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.actor.typed.scaladsl.adapter.ClassicActorSystemOps
+import pekko.cluster.MemberStatus
+import pekko.cluster.sharding.typed.javadsl.ClusterSharding
+import pekko.cluster.typed.Cluster
+import pekko.cluster.typed.Join
+import pekko.grpc.GrpcClientSettings
+import pekko.http.javadsl.Http
+import pekko.http.javadsl.ServerBinding
+import pekko.persistence.typed.ReplicaId
+import pekko.persistence.typed.crdt.LwwTime
+import pekko.persistence.typed.javadsl.CommandHandler
+import pekko.persistence.typed.javadsl.EventHandler
+import pekko.persistence.typed.javadsl.EventSourcedBehavior
+import pekko.persistence.typed.javadsl.ReplicationContext
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.replication
+import pekko.projection.grpc.replication.javadsl.Replica
+import pekko.projection.grpc.replication.javadsl.ReplicatedBehaviors
+import pekko.projection.grpc.replication.javadsl.Replication
+import pekko.projection.grpc.replication.javadsl.ReplicationSettings
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import pekko.projection.r2dbc.javadsl.R2dbcReplication
+import pekko.testkit.SocketUtil
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+import java.time.Duration
+import scala.jdk.FutureConverters._
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration.DurationInt
+import scala.jdk.CollectionConverters._
+
+object ReplicationJavaDSLIntegrationSpec {
+
+  private def config(dc: ReplicaId): Config =
+    ConfigFactory.parseString(s"""
+       pekko.actor.provider = cluster
+       pekko.actor {
+         serialization-bindings {
+           
"${classOf[replication.ReplicationJavaDSLIntegrationSpec].getName}$$LWWHelloWorld$$Event"
 = jackson-json
+         }
+       }
+       pekko.http.server.enable-http2 = on
+       pekko.persistence.r2dbc {
+          query {
+            refresh-interval = 500 millis
+            # reducing this to have quicker test, triggers backtracking earlier
+            backtracking.behind-current-time = 3 seconds
+          }
+        }
+        pekko.projection.grpc {
+          producer {
+            query-plugin-id = "pekko.persistence.r2dbc.query"
+          }
+        }
+        pekko.projection.r2dbc.offset-store {
+          timestamp-offset-table = "projection_timestamp_offset_store_${dc.id}"
+        }
+        pekko.remote.artery.canonical.host = "127.0.0.1"
+        pekko.remote.artery.canonical.port = 0
+        pekko.actor.testkit.typed {
+          filter-leeway = 10s
+          system-shutdown-default = 30s
+        }
+      """)
+
+  private val DCA = ReplicaId("DCA")
+  private val DCB = ReplicaId("DCB")
+  private val DCC = ReplicaId("DCC")
+
+  object LWWHelloWorld {
+
+    sealed trait Command
+
+    case class Get(replyTo: ActorRef[String]) extends Command
+
+    case class SetGreeting(newGreeting: String, replyTo: ActorRef[Done]) 
extends Command
+
+    sealed trait Event
+
+    case class GreetingChanged(greeting: String, timestamp: LwwTime) extends 
Event
+
+    object State {
+      val initial = State("Hello world", LwwTime(Long.MinValue, ReplicaId("")))
+    }
+
+    case class State(greeting: String, timestamp: LwwTime)
+
+    def create(replicatedBehaviors: ReplicatedBehaviors[Command, Event, 
State]) =
+      replicatedBehaviors.setup { replicationContext => new 
LWWHelloWorldBehavior(replicationContext) }
+
+    class LWWHelloWorldBehavior(replicationContext: ReplicationContext)
+        extends EventSourcedBehavior[Command, Event, 
State](replicationContext.persistenceId) {
+      protected def emptyState: State = State.initial
+
+      protected def commandHandler(): CommandHandler[Command, Event, State] =
+        newCommandHandlerBuilder()
+          .forAnyState()
+          .onCommand(classOf[Get],
+            { (state: State, command: Get) =>
+              command.replyTo.tell(state.greeting)
+              Effect.none()
+            })
+          .onCommand(
+            classOf[SetGreeting],
+            { (state: State, command: SetGreeting) =>
+              Effect
+                .persist(
+                  GreetingChanged(
+                    greeting = command.newGreeting,
+                    timestamp =
+                      
state.timestamp.increase(replicationContext.currentTimeMillis(), 
replicationContext.replicaId)))
+                .thenReply(command.replyTo, _ => Done)
+            })
+          .build()
+
+      protected def eventHandler(): EventHandler[State, Event] =
+        newEventHandlerBuilder()
+          .forAnyState()
+          .onEvent(classOf[GreetingChanged],
+            { (currentState: State, event: GreetingChanged) =>
+              if (event.timestamp.isAfter(currentState.timestamp)) 
State(event.greeting, event.timestamp)
+              else currentState
+            })
+          .build()
+    }
+  }
+}
+
+// A shorter version of ReplicationIntegrationSpec covering the Java DSL for 
bootstrapping
+class ReplicationJavaDSLIntegrationSpec(testContainerConf: TestContainerConf)
+    extends ScalaTestWithActorTestKit(
+      pekko.actor.ActorSystem(
+        "ReplicationJavaDSLIntegrationSpecA",
+        ReplicationJavaDSLIntegrationSpec
+          .config(ReplicationJavaDSLIntegrationSpec.DCA)
+          .withFallback(testContainerConf.config))
+        .toTyped)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with BeforeAndAfterAll
+    with LogCapturing {
+  import ReplicationJavaDSLIntegrationSpec._
+  implicit val ec: ExecutionContext = system.executionContext
+
+  def this() = this(new TestContainerConf)
+
+  private val logger = 
LoggerFactory.getLogger(classOf[ReplicationIntegrationSpec])
+  override def typedSystem: ActorSystem[_] = testKit.system
+
+  private val systems = Seq[ActorSystem[_]](
+    typedSystem,
+    pekko.actor.ActorSystem(
+      "ReplicationJavaDSLIntegrationSpecB",
+      
ReplicationJavaDSLIntegrationSpec.config(DCB).withFallback(testContainerConf.config))
+      .toTyped,
+    pekko.actor.ActorSystem(
+      "ReplicationJavaDSLIntegrationSpecC",
+      
ReplicationJavaDSLIntegrationSpec.config(DCC).withFallback(testContainerConf.config))
+      .toTyped)
+
+  private val grpcPorts = SocketUtil.temporaryServerAddresses(systems.size, 
"127.0.0.1").map(_.getPort)
+  private val allDcsAndPorts = Seq(DCA, DCB, DCC).zip(grpcPorts)
+  private val allReplicas = allDcsAndPorts.map {
+    case (id, port) =>
+      Replica.create(id, 2, GrpcClientSettings.connectToServiceAt("127.0.0.1", 
port).withTls(false))
+  }
+
+  private val testKitsPerDc = Map(DCA -> testKit, DCB -> 
ActorTestKit(systems(1)), DCC -> ActorTestKit(systems(2)))
+  private val systemPerDc = Map(DCA -> system, DCB -> systems(1), DCC -> 
systems(2))
+  private var replicatedEventSourcingOverGrpcPerDc: Map[ReplicaId, 
Replication[LWWHelloWorld.Command]] = Map.empty
+  private val entityIds = Set("one", "two", "three")
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+    // We can share the journal to save a bit of work, because the persistence 
id contains
+    // the dc so is unique (this is ofc completely synthetic, the whole point 
of replication
+    // over grpc is to replicate between different dcs/regions with completely 
separate databases).
+    // The offset tables need to be separate though to not get conflicts on 
projection names
+    systemPerDc.values.foreach { system =>
+      val r2dbcProjectionSettings = R2dbcProjectionSettings(system)
+      Await.result(
+        r2dbcExecutor.updateOne("beforeAll delete")(
+          _.createStatement(s"delete from 
${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")),
+        10.seconds)
+
+    }
+  }
+
+  def startReplica(replicaSystem: ActorSystem[_], selfReplicaId: ReplicaId): 
Replication[LWWHelloWorld.Command] = {
+    val settings = ReplicationSettings.create(
+      classOf[LWWHelloWorld.Command],
+      "hello-world-java",
+      selfReplicaId,
+      EventProducerSettings.apply(replicaSystem),
+      allReplicas.toSet.asJava: java.util.Set[Replica],
+      Duration.ofSeconds(10),
+      8,
+      R2dbcReplication.create(system))
+    Replication.grpcReplication(settings, LWWHelloWorld.create _, 
replicaSystem)
+  }
+
+  "Replication over gRPC" should {
+    "form three one node clusters" in {
+      testKitsPerDc.values.foreach { testKit =>
+        val cluster = Cluster(testKit.system)
+        cluster.manager ! Join(cluster.selfMember.address)
+        testKit.createTestProbe().awaitAssert {
+          cluster.selfMember.status should ===(MemberStatus.Up)
+        }
+      }
+    }
+
+    "start three replicas" in {
+      val replicasStarted = Future.sequence(allReplicas.zipWithIndex.map {
+        case (replica, index) =>
+          val system = systems(index)
+          logger
+            .infoN(
+              "Starting replica [{}], system [{}] on port [{}]",
+              replica.replicaId,
+              system.name,
+              replica.grpcClientSettings.defaultPort)
+          val started = startReplica(system, replica.replicaId)
+          val grpcPort = grpcPorts(index)
+
+          // start producer server
+          Http(system)
+            .newServerAt("127.0.0.1", grpcPort)
+            .bind(started.createSingleServiceHandler())
+            .asScala
+            .map { binding: ServerBinding =>
+              binding.addToCoordinatedShutdown(Duration.ofSeconds(3), system)
+              replica.replicaId -> started
+            }
+
+      })
+
+      replicatedEventSourcingOverGrpcPerDc = replicasStarted.futureValue.toMap
+      logger.info("All three replication/producer services bound")
+    }
+
+    "replicate writes from one dc to the other two" in {
+      val entityTypeKey = 
replicatedEventSourcingOverGrpcPerDc.values.head.entityTypeKey
+      systemPerDc.keys.foreach { dc =>
+        withClue(s"from ${dc.id}") {
+          Future
+            .sequence(entityIds.map { entityId =>
+              logger.infoN("Updating greeting for [{}] from dc [{}]", 
entityId, dc.id)
+              ClusterSharding
+                .get(systemPerDc(dc))
+                .entityRefFor(entityTypeKey, entityId)
+                .ask(LWWHelloWorld.SetGreeting(s"hello 1 from ${dc.id}", _), 
Duration.ofSeconds(3))
+                .asScala
+            })
+            .futureValue
+
+          testKitsPerDc.values.foreach { testKit =>
+            withClue(s"on ${system.name}") {
+              val probe = testKit.createTestProbe()
+
+              entityIds.foreach { entityId =>
+                withClue(s"for entity id $entityId") {
+                  val entityRef = ClusterSharding
+                    .get(testKit.system)
+                    .entityRefFor(entityTypeKey, entityId)
+
+                  probe.awaitAssert({
+                      entityRef
+                        .ask(LWWHelloWorld.Get.apply, Duration.ofSeconds(10))
+                        .asScala
+                        .futureValue should ===(s"hello 1 from ${dc.id}")
+                    }, 10.seconds)
+                }
+              }
+            }
+          }
+        }
+      }
+    }
+  }
+
+  protected override def afterAll(): Unit = {
+    logger.info("Shutting down all three DCs")
+    systems.foreach(_.terminate()) // speed up termination by terminating all 
at the once
+    // and then make sure they are completely shutdown
+    systems.foreach { system =>
+      ActorTestKit.shutdown(system)
+    }
+    super.afterAll()
+  }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to