This is an automated email from the ASF dual-hosted git repository.
pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git
The following commit(s) were added to refs/heads/main by this push:
new fcd4598 add grpc int tests module (#467)
fcd4598 is described below
commit fcd459898c2d2b3ed3d30efd644d5d6d65f6bfcc
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 8 12:33:44 2026 +0100
add grpc int tests module (#467)
* Add grpc-int-test module with integration tests ported from
akka-projection
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/bea6fbd0-2080-45d8-beb2-b0874db6b9e4
Co-authored-by: pjfanning <[email protected]>
* Fix EventTimestampQuerySpec: use timestampB in assertion for event B
Agent-Logs-Url:
https://github.com/pjfanning/incubator-pekko-projection/sessions/bea6fbd0-2080-45d8-beb2-b0874db6b9e4
Co-authored-by: pjfanning <[email protected]>
* Update test dependencies to use lowercase 'test'
* compile issue
* Update TestContainerConf.scala
* workflow
* Update Dependencies.scala
* snapshot jars
* Update Dependencies.scala
* Update build.sbt
* Update TestEntity.scala
---------
Co-authored-by: copilot-swe-agent[bot]
<[email protected]>
Co-authored-by: pjfanning <[email protected]>
---
.github/workflows/integration-tests-grpc.yml | 63 ++++
build.sbt | 17 +-
.../src/test/resources/db/default-init.sql | 96 ++++++
grpc-int-test/src/test/resources/logback-test.xml | 40 +++
grpc-int-test/src/test/resources/persistence.conf | 34 ++
.../pekko/projection/grpc/IntegrationSpec.scala | 345 +++++++++++++++++++++
.../pekko/projection/grpc/TestContainerConf.scala | 61 ++++
.../apache/pekko/projection/grpc/TestData.scala | 41 +++
.../pekko/projection/grpc/TestDbLifecycle.scala | 62 ++++
.../apache/pekko/projection/grpc/TestEntity.scala | 57 ++++
.../scaladsl/EventTimestampQuerySpec.scala | 129 ++++++++
.../consumer/scaladsl/LoadEventQuerySpec.scala | 149 +++++++++
project/Dependencies.scala | 13 +
13 files changed, 1105 insertions(+), 2 deletions(-)
diff --git a/.github/workflows/integration-tests-grpc.yml
b/.github/workflows/integration-tests-grpc.yml
new file mode 100644
index 0000000..9afe7a4
--- /dev/null
+++ b/.github/workflows/integration-tests-grpc.yml
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# license agreements; and to You under the Apache License, version 2.0:
+#
+# https://www.apache.org/licenses/LICENSE-2.0
+#
+# This file is part of the Apache Pekko project, which was derived from Akka.
+#
+
+name: Integration Tests for gRPC
+
+on:
+ pull_request:
+ push:
+ branches:
+ - main
+ tags-ignore: [ v.* ]
+
+permissions:
+ contents: read
+
+jobs:
+ test:
+ name: Build and Test Integration for gRPC
+ runs-on: ubuntu-22.04
+ strategy:
+ fail-fast: false
+ matrix:
+ include:
+ - { java-version: 17, scala-version: 2.13, sbt-opts: '' }
+
+ steps:
+ - name: Checkout
+ uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd #
v6.0.2
+ with:
+ fetch-depth: 0
+ fetch-tags: true
+
+ - name: Checkout GitHub merge
+ if: github.event.pull_request
+ run: |-
+ git fetch origin pull/${{ github.event.pull_request.number
}}/merge:scratch
+ git checkout scratch
+
+ - name: Cache Coursier cache
+ uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 #
v8.1.0
+
+ - name: Setup JDK ${{ matrix.java-version }}
+ uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 #
v5.2.0
+ with:
+ java-version: ${{ matrix.java-version }}
+ distribution: temurin
+
+ - name: Install sbt
+ uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+ - name: Run all integration tests with default Scala and Java ${{
matrix.java-version }}
+ run: sbt "grpc-int-test/test" ${{ matrix.extraOpts }}
+ env: # Disable Ryuk resource reaper since we always spin up fresh VMs
+ TESTCONTAINERS_RYUK_DISABLED: true
+
+ - name: Print logs on failure
+ if: ${{ failure() }}
+ run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/build.sbt b/build.sbt
index 63e3521..aaca412 100644
--- a/build.sbt
+++ b/build.sbt
@@ -179,6 +179,20 @@ lazy val grpc =
.dependsOn(eventsourced)
.dependsOn(testkit % Test)
+lazy val grpcIntTest =
+ Project(id = "grpc-int-test", base = file("grpc-int-test"))
+ .disablePlugins(MimaPlugin)
+ .settings(Dependencies.grpcIntTest)
+ .settings(
+ name := "pekko-projection-grpc-int-test",
+ publish / skip := true,
+ Test / parallelExecution := false,
+ // we need to access snapshot jars for pekko-persistence-r2dbc
+ resolvers += Resolver.ApacheMavenSnapshotsRepo)
+ .dependsOn(grpc % "test->test;test->compile")
+ .dependsOn(eventsourced % Test)
+ .dependsOn(testkit % Test)
+
lazy val userProjects: Seq[ProjectReference] = List[ProjectReference](
core, jdbc, slick, cassandra, eventsourced, kafka, `durable-state`, grpc,
testkit)
@@ -264,8 +278,7 @@ lazy val billOfMaterials = Project("bill-of-materials",
file("bill-of-materials"
lazy val root = Project(id = "projection", base = file("."))
.aggregate(userProjects: _*)
- .aggregate(billOfMaterials, coreTest, kafkaTest, cassandraTest, jdbcIntTest,
slickIntTest, examples,
- integrationExamples, docs)
+ .aggregate(billOfMaterials, coreTest, kafkaTest, cassandraTest, examples,
integrationExamples, docs)
.settings(
publish / skip := true,
name := "pekko-projection-root")
diff --git a/grpc-int-test/src/test/resources/db/default-init.sql
b/grpc-int-test/src/test/resources/db/default-init.sql
new file mode 100644
index 0000000..7bf59c8
--- /dev/null
+++ b/grpc-int-test/src/test/resources/db/default-init.sql
@@ -0,0 +1,96 @@
+-- Note: duplicated from ddl-scripts to be available on test classpath
+CREATE TABLE IF NOT EXISTS event_journal(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ event_ser_id INTEGER NOT NULL,
+ event_ser_manifest VARCHAR(255) NOT NULL,
+ event_payload BYTEA NOT NULL,
+
+ deleted BOOLEAN DEFAULT FALSE NOT NULL,
+ writer VARCHAR(255) NOT NULL,
+ adapter_manifest VARCHAR(255),
+ tags TEXT ARRAY,
+
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice,
entity_type, db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ write_timestamp BIGINT NOT NULL,
+ ser_id INTEGER NOT NULL,
+ ser_manifest VARCHAR(255) NOT NULL,
+ snapshot BYTEA NOT NULL,
+ meta_ser_id INTEGER,
+ meta_ser_manifest VARCHAR(255),
+ meta_payload BYTEA,
+
+ PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+ slice INT NOT NULL,
+ entity_type VARCHAR(255) NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ revision BIGINT NOT NULL,
+ db_timestamp timestamp with time zone NOT NULL,
+
+ state_ser_id INTEGER NOT NULL,
+ state_ser_manifest VARCHAR(255),
+ state_payload BYTEA NOT NULL,
+ tags TEXT ARRAY,
+
+ PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice,
entity_type, db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ current_offset VARCHAR(255) NOT NULL,
+ manifest VARCHAR(32) NOT NULL,
+ mergeable BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ slice INT NOT NULL,
+ persistence_id VARCHAR(255) NOT NULL,
+ seq_nr BIGINT NOT NULL,
+ -- timestamp_offset is the db_timestamp of the original event
+ timestamp_offset timestamp with time zone NOT NULL,
+ -- timestamp_consumed is when the offset was stored
+ -- the consumer lag is timestamp_consumed - timestamp_offset
+ timestamp_consumed timestamp with time zone NOT NULL,
+ PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+ projection_name VARCHAR(255) NOT NULL,
+ projection_key VARCHAR(255) NOT NULL,
+ paused BOOLEAN NOT NULL,
+ last_updated BIGINT NOT NULL,
+ PRIMARY KEY(projection_name, projection_key)
+);
diff --git a/grpc-int-test/src/test/resources/logback-test.xml
b/grpc-int-test/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2e10b3b
--- /dev/null
+++ b/grpc-int-test/src/test/resources/logback-test.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<configuration>
+
+ <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+ <encoder>
+ <pattern>[%date{ISO8601}] [%level] [%logger] [%X{pekkoAddress}]
[%marker] [%thread] - %msg%n</pattern>
+ </encoder>
+ </appender>
+
+ <appender name="CapturingAppender"
class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender"/>
+ <logger
name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate">
+ <appender-ref ref="STDOUT"/>
+ </logger>
+
+ <logger name="org.apache.pekko.projection.grpc" level="TRACE" />
+ <logger name="org.apache.pekko.projection.r2dbc" level="DEBUG" />
+ <logger name="org.apache.pekko.persistence.r2dbc" level="DEBUG" />
+
+
+ <root level="INFO">
+ <appender-ref ref="CapturingAppender"/>
+ </root>
+
+</configuration>
diff --git a/grpc-int-test/src/test/resources/persistence.conf
b/grpc-int-test/src/test/resources/persistence.conf
new file mode 100644
index 0000000..8149189
--- /dev/null
+++ b/grpc-int-test/src/test/resources/persistence.conf
@@ -0,0 +1,34 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+ persistence {
+ journal {
+ plugin = "pekko.persistence.r2dbc.journal"
+ }
+ snapshot-store {
+ plugin = "pekko.persistence.r2dbc.snapshot"
+ }
+ r2dbc {
+ journal {
+ publish-events = on
+ }
+ query {
+ # Note that this can probably be decreased if we can use db time, see
use-app-timestamp.
+ behind-current-time = 500 millis
+ }
+
+ # We trust that system time will not move backward for two subsequent
persists from the same entity.
+ db-timestamp-monotonic-increasing = on
+
+ # Enable app-side timestamps as a workaround for databases without
monotonically increasing db timestamps.
+ use-app-timestamp = on
+ }
+ }
+
+ projection.r2dbc {
+ offset-store {
+ # only timestamp based offsets
+ offset-table = ""
+ }
+ }
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
new file mode 100644
index 0000000..1a6ca84
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.GrpcServiceException
+import pekko.grpc.scaladsl.Metadata
+import pekko.grpc.scaladsl.MetadataBuilder
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.typed.PersistenceId
+import pekko.projection.ProjectionBehavior
+import pekko.projection.ProjectionId
+import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.consumer.scaladsl.GrpcReadJournal
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import pekko.projection.grpc.producer.scaladsl.EventProducerInterceptor
+import pekko.projection.r2dbc.scaladsl.R2dbcHandler
+import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+import pekko.projection.r2dbc.scaladsl.R2dbcSession
+import pekko.projection.scaladsl.Handler
+import pekko.testkit.SocketUtil
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import io.grpc.Status
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+object IntegrationSpec {
+
+ val grpcPort: Int = SocketUtil.temporaryServerAddress("127.0.0.1").getPort
+
+ val config: Config = ConfigFactory
+ .parseString(s"""
+ pekko.http.server.preview.enable-http2 = on
+ pekko.persistence.r2dbc {
+ query {
+ refresh-interval = 500 millis
+ # reducing this to have quicker test, triggers backtracking earlier
+ backtracking.behind-current-time = 3 seconds
+ }
+ }
+ pekko.projection.grpc {
+ producer {
+ query-plugin-id = "pekko.persistence.r2dbc.query"
+ }
+ }
+ pekko.actor.testkit.typed.filter-leeway = 10s
+ """)
+
+ final case class Processed(projectionId: ProjectionId, envelope:
EventEnvelope[String])
+
+ class TestHandler(projectionId: ProjectionId, probe: ActorRef[Processed])
extends Handler[EventEnvelope[String]] {
+ private val log = LoggerFactory.getLogger(getClass)
+
+ override def process(envelope: EventEnvelope[String]): Future[Done] = {
+ log.debug2("{} Processed {}", projectionId.key, envelope.event)
+ probe ! Processed(projectionId, envelope)
+ Future.successful(Done)
+ }
+ }
+
+ class TestR2dbcHandler(projectionId: ProjectionId, probe:
ActorRef[Processed])
+ extends R2dbcHandler[EventEnvelope[String]] {
+ private val log = LoggerFactory.getLogger(getClass)
+
+ override def process(session: R2dbcSession, envelope:
EventEnvelope[String]): Future[Done] = {
+ log.debug2("{} Processed {}", projectionId.key, envelope.event)
+ probe ! Processed(projectionId, envelope)
+ Future.successful(Done)
+ }
+ }
+}
+
+class IntegrationSpec(testContainerConf: TestContainerConf)
+ extends
ScalaTestWithActorTestKit(IntegrationSpec.config.withFallback(testContainerConf.config))
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with BeforeAndAfterAll
+ with LogCapturing {
+ import IntegrationSpec._
+
+ def this() = this(new TestContainerConf)
+
+ override def typedSystem: ActorSystem[_] = system
+ private implicit val ec: ExecutionContext = system.executionContext
+ private val numberOfTests = 4
+
+ // needs to be unique per test case and known up front for setting up the
producer
+ case class TestSource(entityType: String, streamId: String, pid:
PersistenceId)
+ private val testSources = (1 to numberOfTests).map { n =>
+ val entityType = nextEntityType()
+ val streamId = s"stream_id_$n"
+ val pid = nextPid(entityType) // consuming side pid still has entity type
+ TestSource(entityType, streamId, pid)
+ }
+ private val testSourceIterator = testSources.iterator
+
+ class TestFixture {
+ val testSource = testSourceIterator.next()
+ def streamId = testSource.streamId
+ def pid = testSource.pid
+ val sliceRange = 0 to 1023
+ val projectionId = randomProjectionId()
+
+ val replyProbe = createTestProbe[Done]()
+ val processedProbe = createTestProbe[Processed]()
+
+ lazy val entity = spawn(TestEntity(pid))
+
+ private def sourceProvider =
+ EventSourcedProvider.eventsBySlices[String](
+ system,
+ GrpcReadJournal(
+ GrpcQuerySettings(streamId).withAdditionalRequestMetadata(
+ new MetadataBuilder().addText("x-secret", "top_secret").build()),
+ GrpcClientSettings
+ .connectToServiceAt("127.0.0.1", grpcPort)
+ .withTls(false),
+ protobufDescriptors = Nil),
+ // FIXME: error prone that it needs to be passed both to
GrpcReadJournal and here?
+ // but on the consuming side we don't know about the producing side
entity types
+ streamId,
+ sliceRange.min,
+ sliceRange.max)
+
+ def spawnAtLeastOnceProjection(): ActorRef[ProjectionBehavior.Command] =
+ spawn(
+ ProjectionBehavior(
+ R2dbcProjection.atLeastOnceAsync(
+ projectionId,
+ settings = None,
+ sourceProvider = sourceProvider,
+ handler = () => new TestHandler(projectionId,
processedProbe.ref))))
+
+ def spawnExactlyOnceProjection(): ActorRef[ProjectionBehavior.Command] =
+ spawn(
+ ProjectionBehavior(
+ R2dbcProjection.exactlyOnce(
+ projectionId,
+ settings = None,
+ sourceProvider = sourceProvider,
+ handler = () => new TestR2dbcHandler(projectionId,
processedProbe.ref))))
+
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ val transformation =
+ Transformation.empty.registerAsyncMapper((event: String) => {
+ if (event.contains("*"))
+ Future.successful(None)
+ else
+ Future.successful(Some(event.toUpperCase))
+ })
+
+ val eventProducerSources = testSources
+ .map(source =>
+ EventProducerSource(source.entityType, source.streamId,
transformation, EventProducerSettings(system)))
+ .toSet
+
+ val authInterceptor = new EventProducerInterceptor {
+ def intercept(streamId: String, requestMetadata: Metadata): Future[Done]
= {
+ if (!requestMetadata.getText("x-secret").contains("top_secret"))
+ throw new GrpcServiceException(Status.PERMISSION_DENIED)
+ else Future.successful(Done)
+ }
+ }
+
+ val eventProducerService =
+ EventProducer.grpcServiceHandler(eventProducerSources,
Some(authInterceptor))
+
+ val service: HttpRequest => Future[HttpResponse] =
+ ServiceHandler.concatOrNotFound(eventProducerService)
+
+ val bound =
+ Http()
+ .newServerAt("127.0.0.1", grpcPort)
+ .bind(service)
+ .map(_.addToCoordinatedShutdown(3.seconds))
+
+ bound.futureValue
+ }
+
+ protected override def afterAll(): Unit = {
+ super.afterAll()
+ testContainerConf.stop()
+ }
+
+ "A gRPC Projection" must {
+ "receive events" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ // start the projection
+ val projection = spawnAtLeastOnceProjection()
+
+ val processedA = processedProbe.receiveMessage()
+ processedA.envelope.persistenceId shouldBe pid.id
+ processedA.envelope.sequenceNr shouldBe 1L
+ processedA.envelope.event shouldBe "A"
+
+ val processedB = processedProbe.receiveMessage()
+ processedB.envelope.persistenceId shouldBe pid.id
+ processedB.envelope.sequenceNr shouldBe 2L
+ processedB.envelope.event shouldBe "B"
+
+ entity ! TestEntity.Persist("c")
+ val processedC = processedProbe.receiveMessage()
+ processedC.envelope.persistenceId shouldBe pid.id
+ processedC.envelope.sequenceNr shouldBe 3L
+ processedC.envelope.event shouldBe "C"
+
+ projection ! ProjectionBehavior.Stop
+ entity ! TestEntity.Stop(replyProbe.ref)
+ processedProbe.expectTerminated(projection)
+ processedProbe.expectTerminated(entity)
+ }
+
+ "filter out events" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b*")
+ entity ! TestEntity.Persist("c")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ // start the projection
+ val projection = spawnAtLeastOnceProjection()
+
+ val processedA = processedProbe.receiveMessage()
+ processedA.envelope.persistenceId shouldBe pid.id
+ processedA.envelope.sequenceNr shouldBe 1L
+ processedA.envelope.event shouldBe "A"
+
+ // b* is filtered out by the registered transformation
+
+ val processedC = processedProbe.receiveMessage()
+ processedC.envelope.persistenceId shouldBe pid.id
+ processedC.envelope.sequenceNr shouldBe 3L
+ processedC.envelope.event shouldBe "C"
+
+ projection ! ProjectionBehavior.Stop
+ entity ! TestEntity.Stop(replyProbe.ref)
+
+ processedProbe.expectTerminated(projection)
+ processedProbe.expectTerminated(entity)
+ }
+
+ "resume from offset" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ // start the projection
+ val projection = spawnExactlyOnceProjection()
+
+ processedProbe.receiveMessage().envelope.event shouldBe "A"
+ processedProbe.receiveMessage().envelope.event shouldBe "B"
+ processedProbe.expectNoMessage()
+
+ projection ! ProjectionBehavior.Stop
+ processedProbe.expectTerminated(projection)
+ // start new projection
+ val projection2 = spawnExactlyOnceProjection()
+
+ entity ! TestEntity.Persist("c")
+ processedProbe.receiveMessage().envelope.event shouldBe "C"
+
+ processedProbe.expectNoMessage()
+ projection2 ! ProjectionBehavior.Stop
+ entity ! TestEntity.Stop(replyProbe.ref)
+
+ processedProbe.expectTerminated(projection2)
+ processedProbe.expectTerminated(entity)
+ }
+
+ "deduplicate backtracking events" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b")
+ entity ! TestEntity.Persist("c")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ def expectedLogMessage(seqNr: Long): String =
+ s"Received backtracking event from [127.0.0.1] persistenceId
[${pid.id}] with seqNr [$seqNr]"
+ val projection =
+ LoggingTestKit.trace(expectedLogMessage(1)).expect {
+ LoggingTestKit.trace(expectedLogMessage(2)).expect {
+ LoggingTestKit.trace(expectedLogMessage(3)).expect {
+ // start the projection
+ spawnExactlyOnceProjection()
+ }
+ }
+ }
+
+ processedProbe.receiveMessage().envelope.event shouldBe "A"
+ processedProbe.receiveMessage().envelope.event shouldBe "B"
+ processedProbe.receiveMessage().envelope.event shouldBe "C"
+
+ processedProbe.expectNoMessage()
+ projection ! ProjectionBehavior.Stop
+ entity ! TestEntity.Stop(replyProbe.ref)
+
+ processedProbe.expectTerminated(projection)
+ processedProbe.expectTerminated(entity)
+ }
+ }
+
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
new file mode 100644
index 0000000..d0e78ea
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.testkit.SocketUtil
+import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy
+import org.testcontainers.postgresql.PostgreSQLContainer
+
+class TestContainerConf {
+ val grpcPort: Int = SocketUtil.temporaryServerAddress("127.0.0.1").getPort
+
+ private val container = new PostgreSQLContainer("postgres:18.0")
+ container.withInitScript("db/default-init.sql")
+ container.withStartupCheckStrategy(new IsRunningStartupCheckStrategy)
+ container.withStartupAttempts(5)
+ container.start()
+
+ def config: Config =
+ ConfigFactory
+ .parseString(s"""
+ pekko.http.server.preview.enable-http2 = on
+ pekko.projection.grpc {
+ consumer.client {
+ host = "127.0.0.1"
+ port = $grpcPort
+ use-tls = false
+ }
+ producer {
+ query-plugin-id = "pekko.persistence.r2dbc.query"
+ }
+ }
+ pekko.persistence.r2dbc {
+ # yugabyte or postgres
+ dialect = "postgres"
+ connection-factory {
+ driver = "postgres"
+ host = "${container.getHost}"
+ port = ${container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)}
+ database = "${container.getDatabaseName}"
+ user = "${container.getUsername}"
+ password = "${container.getPassword}"
+ }
+ }
+ """)
+ .withFallback(ConfigFactory.load("persistence.conf"))
+
+ def stop(): Unit = container.stop()
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala
new file mode 100644
index 0000000..b1eeb89
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.pekko.persistence.typed.PersistenceId
+import org.apache.pekko.projection.ProjectionId
+
+object TestData {
+ private val start =
+ 0L // could be something more unique, like currentTimeMillis
+ private val pidCounter = new AtomicLong(start)
+ private val entityTypeCounter = new AtomicLong(start)
+}
+
+trait TestData {
+ import TestData.entityTypeCounter
+ import TestData.pidCounter
+
+ def nextPid(entityType: String): PersistenceId =
+ PersistenceId(entityType, s"p-${pidCounter.incrementAndGet()}")
+
+ def nextEntityType() = s"TestEntity-${entityTypeCounter.incrementAndGet()}"
+
+ def randomProjectionId(): ProjectionId =
+ ProjectionId(UUID.randomUUID().toString, "00")
+
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
new file mode 100644
index 0000000..44884d6
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.Suite
+import org.slf4j.LoggerFactory
+
+trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
+
+ def typedSystem: ActorSystem[_]
+
+ def testConfigPath: String = "pekko.projection.r2dbc"
+
+ lazy val r2dbcProjectionSettings: R2dbcProjectionSettings =
+
R2dbcProjectionSettings(typedSystem.settings.config.getConfig(testConfigPath))
+
+ lazy val r2dbcExecutor: R2dbcExecutor = {
+ new R2dbcExecutor(
+
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
+ LoggerFactory.getLogger(getClass),
+
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext,
typedSystem)
+ }
+
+ lazy val persistenceExt: Persistence = Persistence(typedSystem)
+
+ override protected def beforeAll(): Unit = {
+ lazy val journalSettings: JournalSettings =
+
JournalSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from
${journalSettings.journalTableWithSchema}")),
+ 10.seconds)
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from
${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")),
+ 10.seconds)
+ super.beforeAll()
+ }
+
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
new file mode 100644
index 0000000..964842f
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+
+object TestEntity {
+ sealed trait Command
+ final case class Persist(payload: Any) extends Command
+ final case class Ping(replyTo: ActorRef[Done]) extends Command
+ final case class Stop(replyTo: ActorRef[Done]) extends Command
+
+ def apply(pid: PersistenceId): Behavior[Command] = {
+ Behaviors.setup { context =>
+ EventSourcedBehavior[Command, Any, String](
+ persistenceId = pid,
+ "",
+ { (_, command) =>
+ command match {
+ case command: Persist =>
+ context.log.debugN(
+ "Persist [{}], pid [{}], seqNr [{}]",
+ command.payload,
+ pid.id,
+ EventSourcedBehavior.lastSequenceNumber(context) + 1)
+ Effect.persist(command.payload)
+ case Ping(replyTo) =>
+ replyTo ! Done
+ Effect.none
+ case Stop(replyTo) =>
+ replyTo ! Done
+ Effect.stop()
+ }
+ },
+ (_, _) => "")
+ }
+ }
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
new file mode 100644
index 0000000..b03a7f4
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.consumer.scaladsl
+
+import java.time.Instant
+import java.time.{ Duration => JDuration }
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestData
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.TestEntity
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
+ extends ScalaTestWithActorTestKit(testContainerConf.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with BeforeAndAfterAll
+ with LogCapturing {
+
+ def this() = this(new TestContainerConf)
+
+ protected override def afterAll(): Unit = {
+ super.afterAll()
+ testContainerConf.stop()
+ }
+
+ override def typedSystem: ActorSystem[_] = system
+ private implicit val ec: ExecutionContext = system.executionContext
+ private val entityType = nextEntityType()
+ private val streamId = "stream_id_" + entityType
+
+ class TestFixture {
+ val pid = nextPid(entityType)
+
+ val replyProbe = createTestProbe[Done]()
+
+ lazy val entity = spawn(TestEntity(pid))
+
+ lazy val grpcReadJournal = GrpcReadJournal(
+ GrpcQuerySettings(streamId),
+
GrpcClientSettings.fromConfig(system.settings.config.getConfig("pekko.projection.grpc.consumer.client")),
+ protobufDescriptors = Nil)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ val eventProducerSource =
+ EventProducerSource(entityType, streamId, Transformation.identity,
EventProducerSettings(system))
+
+ val eventProducerService =
+ EventProducer.grpcServiceHandler(eventProducerSource)
+
+ val service: HttpRequest => Future[HttpResponse] =
+ ServiceHandler.concatOrNotFound(eventProducerService)
+
+ val bound =
+ Http()
+ .newServerAt("127.0.0.1", testContainerConf.grpcPort)
+ .bind(service)
+ .map(_.addToCoordinatedShutdown(3.seconds))
+
+ bound.futureValue
+ }
+
+ "GrpcReadJournal with EventTimestampQuery" must {
+ "lookup event timestamp" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ val timestampA =
+ grpcReadJournal.timestampOf(pid.id, sequenceNr = 1L).futureValue.get
+ JDuration
+ .between(timestampA, Instant.now())
+ .toMillis should (be >= 0L and be <= 3000L)
+
+ val timestampB =
+ grpcReadJournal.timestampOf(pid.id, sequenceNr = 2L).futureValue.get
+ JDuration
+ .between(timestampB, Instant.now())
+ .toMillis should (be >= 0L and be <= 3000L)
+
+ if (timestampB != timestampA)
+ timestampB.isAfter(timestampA) shouldBe true
+ }
+
+ "handle missing event as None" in new TestFixture {
+ grpcReadJournal
+ .timestampOf(pid.id, sequenceNr = 13L)
+ .futureValue
+ .isEmpty shouldBe true
+ }
+ }
+
+}
diff --git
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
new file mode 100644
index 0000000..0216507
--- /dev/null
+++
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.consumer.scaladsl
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.NotUsed
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.TestEntity
+import pekko.projection.grpc.TestData
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import io.grpc.Status
+import io.grpc.StatusRuntimeException
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class LoadEventQuerySpec(testContainerConf: TestContainerConf)
+ extends ScalaTestWithActorTestKit(testContainerConf.config)
+ with AnyWordSpecLike
+ with TestDbLifecycle
+ with TestData
+ with BeforeAndAfterAll
+ with LogCapturing {
+
+ def this() = this(new TestContainerConf)
+
+ override def typedSystem: ActorSystem[_] = system
+ private implicit val ec: ExecutionContext = system.executionContext
+ private val entityType = nextEntityType()
+ private val streamId = "stream_id_" + entityType
+
+ protected override def afterAll(): Unit = {
+ super.afterAll()
+ testContainerConf.stop()
+ }
+
+ class TestFixture {
+
+ val replyProbe = createTestProbe[Done]()
+ val pid = nextPid(entityType)
+
+ lazy val entity = spawn(TestEntity(pid))
+
+ lazy val grpcReadJournal = GrpcReadJournal(
+ GrpcQuerySettings(streamId),
+ GrpcClientSettings
+ .connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
+ .withTls(false),
+ protobufDescriptors = Nil)
+ }
+
+ override protected def beforeAll(): Unit = {
+ super.beforeAll()
+
+ val transformation =
+ Transformation.empty.registerAsyncMapper((event: String) => {
+ if (event.contains("*"))
+ Future.successful(None)
+ else
+ Future.successful(Some(event.toUpperCase))
+ })
+
+ val eventProducerSource = EventProducerSource(entityType, streamId,
transformation, EventProducerSettings(system))
+
+ val eventProducerService =
+ EventProducer.grpcServiceHandler(eventProducerSource)
+
+ val service: HttpRequest => Future[HttpResponse] =
+ ServiceHandler.concatOrNotFound(eventProducerService)
+
+ val bound =
+ Http()
+ .newServerAt("127.0.0.1", testContainerConf.grpcPort)
+ .bind(service)
+ .map(_.addToCoordinatedShutdown(3.seconds))
+
+ bound.futureValue
+ }
+
+ "GrpcReadJournal with LoadEventQuery" must {
+ "load event" in new TestFixture {
+ entity ! TestEntity.Persist("a")
+ entity ! TestEntity.Persist("b")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ grpcReadJournal
+ .loadEnvelope[String](pid.id, sequenceNr = 1L)
+ .futureValue
+ .event shouldBe "A"
+
+ grpcReadJournal
+ .loadEnvelope[String](pid.id, sequenceNr = 2L)
+ .futureValue
+ .event shouldBe "B"
+ }
+
+ "load filtered event" in new TestFixture {
+ entity ! TestEntity.Persist("a*")
+ entity ! TestEntity.Ping(replyProbe.ref)
+ replyProbe.receiveMessage()
+
+ val env = grpcReadJournal
+ .loadEnvelope[String](pid.id, sequenceNr = 1L)
+ .futureValue
+ env.eventOption.isEmpty shouldBe true
+ env.eventMetadata shouldBe Some(NotUsed)
+ }
+
+ "handle missing event as NOT_FOUND" in new TestFixture {
+ val status =
+ intercept[StatusRuntimeException] {
+ Await.result(grpcReadJournal.loadEnvelope[String](pid.id, sequenceNr
= 13L), replyProbe.remainingOrDefault)
+ fail("Expected NOT_FOUND")
+ }.getStatus
+ status.getCode shouldBe Status.NOT_FOUND.getCode
+ }
+ }
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 5c62403..f283691 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -26,6 +26,7 @@ object Dependencies {
val pekko = PekkoCoreDependency.version
val pekkoGrpc = org.apache.pekko.grpc.gen.BuildInfo.version
val pekkoPersistenceJdbc = "2.0.0-M1"
+ val pekkoPersistenceR2dbc = "2.0.0-M0+109-880ba593-SNAPSHOT"
val pekkoPersistenceCassandra = "1.1.0"
val connectors = PekkoConnectorsDependency.version
val connectorsKafka = PekkoConnectorsKafkaDependency.version
@@ -212,6 +213,18 @@ object Dependencies {
Test.logback % "test",
Test.scalatest % "test")
+ val grpcIntTest =
+ deps ++= Seq(
+ "org.apache.pekko" %% "pekko-persistence-r2dbc" %
Versions.pekkoPersistenceR2dbc % "test",
+ "org.apache.pekko" %% "pekko-projection-r2dbc" %
Versions.pekkoPersistenceR2dbc % "test",
+ "org.postgresql" % "r2dbc-postgresql" % "1.1.1.RELEASE" % "test",
+ Test.postgresDriver % "test",
+ Test.pekkoSerializationJackson,
+ Test.pekkoTypedTestkit % "test",
+ Test.postgresContainer % "test",
+ Test.logback % "test",
+ Test.scalatest % "test")
+
val kafkaTest =
deps ++= Seq(
Test.scalatest % "test",
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]