This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new fcd4598  add grpc int tests module (#467)
fcd4598 is described below

commit fcd459898c2d2b3ed3d30efd644d5d6d65f6bfcc
Author: PJ Fanning <[email protected]>
AuthorDate: Fri May 8 12:33:44 2026 +0100

    add grpc int tests module (#467)
    
    * Add grpc-int-test module with integration tests ported from 
akka-projection
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/bea6fbd0-2080-45d8-beb2-b0874db6b9e4
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Fix EventTimestampQuerySpec: use timestampB in assertion for event B
    
    Agent-Logs-Url: 
https://github.com/pjfanning/incubator-pekko-projection/sessions/bea6fbd0-2080-45d8-beb2-b0874db6b9e4
    
    Co-authored-by: pjfanning <[email protected]>
    
    * Update test dependencies to use lowercase 'test'
    
    * compile issue
    
    * Update TestContainerConf.scala
    
    * workflow
    
    * Update Dependencies.scala
    
    * snapshot jars
    
    * Update Dependencies.scala
    
    * Update build.sbt
    
    * Update TestEntity.scala
    
    ---------
    
    Co-authored-by: copilot-swe-agent[bot] 
<[email protected]>
    Co-authored-by: pjfanning <[email protected]>
---
 .github/workflows/integration-tests-grpc.yml       |  63 ++++
 build.sbt                                          |  17 +-
 .../src/test/resources/db/default-init.sql         |  96 ++++++
 grpc-int-test/src/test/resources/logback-test.xml  |  40 +++
 grpc-int-test/src/test/resources/persistence.conf  |  34 ++
 .../pekko/projection/grpc/IntegrationSpec.scala    | 345 +++++++++++++++++++++
 .../pekko/projection/grpc/TestContainerConf.scala  |  61 ++++
 .../apache/pekko/projection/grpc/TestData.scala    |  41 +++
 .../pekko/projection/grpc/TestDbLifecycle.scala    |  62 ++++
 .../apache/pekko/projection/grpc/TestEntity.scala  |  57 ++++
 .../scaladsl/EventTimestampQuerySpec.scala         | 129 ++++++++
 .../consumer/scaladsl/LoadEventQuerySpec.scala     | 149 +++++++++
 project/Dependencies.scala                         |  13 +
 13 files changed, 1105 insertions(+), 2 deletions(-)

diff --git a/.github/workflows/integration-tests-grpc.yml 
b/.github/workflows/integration-tests-grpc.yml
new file mode 100644
index 0000000..9afe7a4
--- /dev/null
+++ b/.github/workflows/integration-tests-grpc.yml
@@ -0,0 +1,63 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# license agreements; and to You under the Apache License, version 2.0:
+#
+#   https://www.apache.org/licenses/LICENSE-2.0
+#
+# This file is part of the Apache Pekko project, which was derived from Akka.
+#
+
+name: Integration Tests for gRPC
+
+on:
+  pull_request:
+  push:
+    branches:
+      - main
+    tags-ignore: [ v.* ]
+
+permissions:
+  contents: read
+
+jobs:
+  test:
+    name: Build and Test Integration for gRPC
+    runs-on: ubuntu-22.04
+    strategy:
+      fail-fast: false
+      matrix:
+        include:
+          - { java-version: 17, scala-version: 2.13,  sbt-opts: '' }
+
+    steps:
+      - name: Checkout
+        uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # 
v6.0.2
+        with:
+          fetch-depth: 0
+          fetch-tags: true
+
+      - name: Checkout GitHub merge
+        if: github.event.pull_request
+        run: |-
+          git fetch origin pull/${{ github.event.pull_request.number 
}}/merge:scratch
+          git checkout scratch
+
+      - name: Cache Coursier cache
+        uses: coursier/cache-action@90c37294538be80a558fd665531fcdc2b467b475 # 
v8.1.0
+
+      - name: Setup JDK ${{ matrix.java-version }}
+        uses: actions/setup-java@be666c2fcd27ec809703dec50e508c2fdc7f6654 # 
v5.2.0
+        with:
+          java-version: ${{ matrix.java-version }}
+          distribution: temurin
+
+      - name: Install sbt
+        uses: sbt/setup-sbt@508b753e53cb6095967669e0911487d2b9bc9f41 # v1.1.22
+
+      - name: Run all integration tests with default Scala and Java ${{ 
matrix.java-version }}
+        run: sbt "grpc-int-test/test" ${{ matrix.extraOpts }}
+        env: # Disable Ryuk resource reaper since we always spin up fresh VMs
+          TESTCONTAINERS_RYUK_DISABLED: true
+
+      - name: Print logs on failure
+        if: ${{ failure() }}
+        run: find . -name "*.log" -exec ./scripts/cat-log.sh {} \;
diff --git a/build.sbt b/build.sbt
index 63e3521..aaca412 100644
--- a/build.sbt
+++ b/build.sbt
@@ -179,6 +179,20 @@ lazy val grpc =
     .dependsOn(eventsourced)
     .dependsOn(testkit % Test)
 
+lazy val grpcIntTest =
+  Project(id = "grpc-int-test", base = file("grpc-int-test"))
+    .disablePlugins(MimaPlugin)
+    .settings(Dependencies.grpcIntTest)
+    .settings(
+      name := "pekko-projection-grpc-int-test",
+      publish / skip := true,
+      Test / parallelExecution := false,
+      // we need to access snapshot jars for pekko-persistence-r2dbc
+      resolvers += Resolver.ApacheMavenSnapshotsRepo)
+    .dependsOn(grpc % "test->test;test->compile")
+    .dependsOn(eventsourced % Test)
+    .dependsOn(testkit % Test)
+
 lazy val userProjects: Seq[ProjectReference] = List[ProjectReference](
   core, jdbc, slick, cassandra, eventsourced, kafka, `durable-state`, grpc, 
testkit)
 
@@ -264,8 +278,7 @@ lazy val billOfMaterials = Project("bill-of-materials", 
file("bill-of-materials"
 
 lazy val root = Project(id = "projection", base = file("."))
   .aggregate(userProjects: _*)
-  .aggregate(billOfMaterials, coreTest, kafkaTest, cassandraTest, jdbcIntTest, 
slickIntTest, examples,
-    integrationExamples, docs)
+  .aggregate(billOfMaterials, coreTest, kafkaTest, cassandraTest, examples, 
integrationExamples, docs)
   .settings(
     publish / skip := true,
     name := "pekko-projection-root")
diff --git a/grpc-int-test/src/test/resources/db/default-init.sql 
b/grpc-int-test/src/test/resources/db/default-init.sql
new file mode 100644
index 0000000..7bf59c8
--- /dev/null
+++ b/grpc-int-test/src/test/resources/db/default-init.sql
@@ -0,0 +1,96 @@
+-- Note: duplicated from ddl-scripts to be available on test classpath
+CREATE TABLE IF NOT EXISTS event_journal(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  db_timestamp timestamp with time zone NOT NULL,
+
+  event_ser_id INTEGER NOT NULL,
+  event_ser_manifest VARCHAR(255) NOT NULL,
+  event_payload BYTEA NOT NULL,
+
+  deleted BOOLEAN DEFAULT FALSE NOT NULL,
+  writer VARCHAR(255) NOT NULL,
+  adapter_manifest VARCHAR(255),
+  tags TEXT ARRAY,
+
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BYTEA,
+
+  PRIMARY KEY(persistence_id, seq_nr)
+);
+
+-- `event_journal_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, 
entity_type, db_timestamp, seq_nr);
+
+CREATE TABLE IF NOT EXISTS snapshot(
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  write_timestamp BIGINT NOT NULL,
+  ser_id INTEGER NOT NULL,
+  ser_manifest VARCHAR(255) NOT NULL,
+  snapshot BYTEA NOT NULL,
+  meta_ser_id INTEGER,
+  meta_ser_manifest VARCHAR(255),
+  meta_payload BYTEA,
+
+  PRIMARY KEY(persistence_id)
+);
+
+CREATE TABLE IF NOT EXISTS durable_state (
+  slice INT NOT NULL,
+  entity_type VARCHAR(255) NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  revision BIGINT NOT NULL,
+  db_timestamp timestamp with time zone NOT NULL,
+
+  state_ser_id INTEGER NOT NULL,
+  state_ser_manifest VARCHAR(255),
+  state_payload BYTEA NOT NULL,
+  tags TEXT ARRAY,
+
+  PRIMARY KEY(persistence_id, revision)
+);
+
+-- `durable_state_slice_idx` is only needed if the slice based queries are used
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, 
entity_type, db_timestamp, revision);
+
+-- Primitive offset types are stored in this table.
+-- If only timestamp based offsets are used this table is optional.
+-- Configure pekko.projection.r2dbc.offset-store.offset-table="" if the table 
is not created.
+CREATE TABLE IF NOT EXISTS projection_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  current_offset VARCHAR(255) NOT NULL,
+  manifest VARCHAR(32) NOT NULL,
+  mergeable BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
+
+-- Timestamp based offsets are stored in this table.
+CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  slice INT NOT NULL,
+  persistence_id VARCHAR(255) NOT NULL,
+  seq_nr BIGINT NOT NULL,
+  -- timestamp_offset is the db_timestamp of the original event
+  timestamp_offset timestamp with time zone NOT NULL,
+  -- timestamp_consumed is when the offset was stored
+  -- the consumer lag is timestamp_consumed - timestamp_offset
+  timestamp_consumed timestamp with time zone NOT NULL,
+  PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
+);
+
+CREATE TABLE IF NOT EXISTS projection_management (
+  projection_name VARCHAR(255) NOT NULL,
+  projection_key VARCHAR(255) NOT NULL,
+  paused BOOLEAN NOT NULL,
+  last_updated BIGINT NOT NULL,
+  PRIMARY KEY(projection_name, projection_key)
+);
diff --git a/grpc-int-test/src/test/resources/logback-test.xml 
b/grpc-int-test/src/test/resources/logback-test.xml
new file mode 100644
index 0000000..2e10b3b
--- /dev/null
+++ b/grpc-int-test/src/test/resources/logback-test.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="utf-8"?>
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<configuration>
+
+    <appender name="STDOUT" class="ch.qos.logback.core.ConsoleAppender">
+        <encoder>
+            <pattern>[%date{ISO8601}] [%level] [%logger] [%X{pekkoAddress}] 
[%marker] [%thread] - %msg%n</pattern>
+        </encoder>
+    </appender>
+
+    <appender name="CapturingAppender" 
class="org.apache.pekko.actor.testkit.typed.internal.CapturingAppender"/>
+    <logger 
name="org.apache.pekko.actor.testkit.typed.internal.CapturingAppenderDelegate">
+        <appender-ref ref="STDOUT"/>
+    </logger>
+
+    <logger name="org.apache.pekko.projection.grpc" level="TRACE" />
+    <logger name="org.apache.pekko.projection.r2dbc" level="DEBUG" />
+    <logger name="org.apache.pekko.persistence.r2dbc" level="DEBUG" />
+
+
+    <root level="INFO">
+        <appender-ref ref="CapturingAppender"/>
+    </root>
+
+</configuration>
diff --git a/grpc-int-test/src/test/resources/persistence.conf 
b/grpc-int-test/src/test/resources/persistence.conf
new file mode 100644
index 0000000..8149189
--- /dev/null
+++ b/grpc-int-test/src/test/resources/persistence.conf
@@ -0,0 +1,34 @@
+# SPDX-License-Identifier: Apache-2.0
+
+pekko {
+  persistence {
+    journal {
+      plugin = "pekko.persistence.r2dbc.journal"
+    }
+    snapshot-store {
+      plugin = "pekko.persistence.r2dbc.snapshot"
+    }
+    r2dbc {
+      journal {
+        publish-events = on
+      }
+      query {
+        # Note that this can probably be decreased if we can use db time, see 
use-app-timestamp.
+        behind-current-time = 500 millis
+      }
+
+      # We trust that system time will not move backward for two subsequent 
persists from the same entity.
+      db-timestamp-monotonic-increasing = on
+
+      # Enable app-side timestamps as a workaround for databases without 
monotonically increasing db timestamps.
+      use-app-timestamp = on
+    }
+  }
+
+  projection.r2dbc {
+    offset-store {
+      # only timestamp based offsets
+      offset-table = ""
+    }
+  }
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
new file mode 100644
index 0000000..1a6ca84
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/IntegrationSpec.scala
@@ -0,0 +1,345 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.LoggingTestKit
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.GrpcServiceException
+import pekko.grpc.scaladsl.Metadata
+import pekko.grpc.scaladsl.MetadataBuilder
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.persistence.typed.PersistenceId
+import pekko.projection.ProjectionBehavior
+import pekko.projection.ProjectionId
+import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.consumer.scaladsl.GrpcReadJournal
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import 
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import pekko.projection.grpc.producer.scaladsl.EventProducerInterceptor
+import pekko.projection.r2dbc.scaladsl.R2dbcHandler
+import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+import pekko.projection.r2dbc.scaladsl.R2dbcSession
+import pekko.projection.scaladsl.Handler
+import pekko.testkit.SocketUtil
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import io.grpc.Status
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+object IntegrationSpec {
+
+  val grpcPort: Int = SocketUtil.temporaryServerAddress("127.0.0.1").getPort
+
+  val config: Config = ConfigFactory
+    .parseString(s"""
+    pekko.http.server.preview.enable-http2 = on
+    pekko.persistence.r2dbc {
+      query {
+        refresh-interval = 500 millis
+        # reducing this to have quicker test, triggers backtracking earlier
+        backtracking.behind-current-time = 3 seconds
+      }
+    }
+    pekko.projection.grpc {
+      producer {
+        query-plugin-id = "pekko.persistence.r2dbc.query"
+      }
+    }
+    pekko.actor.testkit.typed.filter-leeway = 10s
+    """)
+
+  final case class Processed(projectionId: ProjectionId, envelope: 
EventEnvelope[String])
+
+  class TestHandler(projectionId: ProjectionId, probe: ActorRef[Processed]) 
extends Handler[EventEnvelope[String]] {
+    private val log = LoggerFactory.getLogger(getClass)
+
+    override def process(envelope: EventEnvelope[String]): Future[Done] = {
+      log.debug2("{} Processed {}", projectionId.key, envelope.event)
+      probe ! Processed(projectionId, envelope)
+      Future.successful(Done)
+    }
+  }
+
+  class TestR2dbcHandler(projectionId: ProjectionId, probe: 
ActorRef[Processed])
+      extends R2dbcHandler[EventEnvelope[String]] {
+    private val log = LoggerFactory.getLogger(getClass)
+
+    override def process(session: R2dbcSession, envelope: 
EventEnvelope[String]): Future[Done] = {
+      log.debug2("{} Processed {}", projectionId.key, envelope.event)
+      probe ! Processed(projectionId, envelope)
+      Future.successful(Done)
+    }
+  }
+}
+
+class IntegrationSpec(testContainerConf: TestContainerConf)
+    extends 
ScalaTestWithActorTestKit(IntegrationSpec.config.withFallback(testContainerConf.config))
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with BeforeAndAfterAll
+    with LogCapturing {
+  import IntegrationSpec._
+
+  def this() = this(new TestContainerConf)
+
+  override def typedSystem: ActorSystem[_] = system
+  private implicit val ec: ExecutionContext = system.executionContext
+  private val numberOfTests = 4
+
+  // needs to be unique per test case and known up front for setting up the 
producer
+  case class TestSource(entityType: String, streamId: String, pid: 
PersistenceId)
+  private val testSources = (1 to numberOfTests).map { n =>
+    val entityType = nextEntityType()
+    val streamId = s"stream_id_$n"
+    val pid = nextPid(entityType) // consuming side pid still has entity type
+    TestSource(entityType, streamId, pid)
+  }
+  private val testSourceIterator = testSources.iterator
+
+  class TestFixture {
+    val testSource = testSourceIterator.next()
+    def streamId = testSource.streamId
+    def pid = testSource.pid
+    val sliceRange = 0 to 1023
+    val projectionId = randomProjectionId()
+
+    val replyProbe = createTestProbe[Done]()
+    val processedProbe = createTestProbe[Processed]()
+
+    lazy val entity = spawn(TestEntity(pid))
+
+    private def sourceProvider =
+      EventSourcedProvider.eventsBySlices[String](
+        system,
+        GrpcReadJournal(
+          GrpcQuerySettings(streamId).withAdditionalRequestMetadata(
+            new MetadataBuilder().addText("x-secret", "top_secret").build()),
+          GrpcClientSettings
+            .connectToServiceAt("127.0.0.1", grpcPort)
+            .withTls(false),
+          protobufDescriptors = Nil),
+        // FIXME: error prone that it needs to be passed both to 
GrpcReadJournal and here?
+        // but on the consuming side we don't know about the producing side 
entity types
+        streamId,
+        sliceRange.min,
+        sliceRange.max)
+
+    def spawnAtLeastOnceProjection(): ActorRef[ProjectionBehavior.Command] =
+      spawn(
+        ProjectionBehavior(
+          R2dbcProjection.atLeastOnceAsync(
+            projectionId,
+            settings = None,
+            sourceProvider = sourceProvider,
+            handler = () => new TestHandler(projectionId, 
processedProbe.ref))))
+
+    def spawnExactlyOnceProjection(): ActorRef[ProjectionBehavior.Command] =
+      spawn(
+        ProjectionBehavior(
+          R2dbcProjection.exactlyOnce(
+            projectionId,
+            settings = None,
+            sourceProvider = sourceProvider,
+            handler = () => new TestR2dbcHandler(projectionId, 
processedProbe.ref))))
+
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val transformation =
+      Transformation.empty.registerAsyncMapper((event: String) => {
+        if (event.contains("*"))
+          Future.successful(None)
+        else
+          Future.successful(Some(event.toUpperCase))
+      })
+
+    val eventProducerSources = testSources
+      .map(source =>
+        EventProducerSource(source.entityType, source.streamId, 
transformation, EventProducerSettings(system)))
+      .toSet
+
+    val authInterceptor = new EventProducerInterceptor {
+      def intercept(streamId: String, requestMetadata: Metadata): Future[Done] 
= {
+        if (!requestMetadata.getText("x-secret").contains("top_secret"))
+          throw new GrpcServiceException(Status.PERMISSION_DENIED)
+        else Future.successful(Done)
+      }
+    }
+
+    val eventProducerService =
+      EventProducer.grpcServiceHandler(eventProducerSources, 
Some(authInterceptor))
+
+    val service: HttpRequest => Future[HttpResponse] =
+      ServiceHandler.concatOrNotFound(eventProducerService)
+
+    val bound =
+      Http()
+        .newServerAt("127.0.0.1", grpcPort)
+        .bind(service)
+        .map(_.addToCoordinatedShutdown(3.seconds))
+
+    bound.futureValue
+  }
+
+  protected override def afterAll(): Unit = {
+    super.afterAll()
+    testContainerConf.stop()
+  }
+
+  "A gRPC Projection" must {
+    "receive events" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      // start the projection
+      val projection = spawnAtLeastOnceProjection()
+
+      val processedA = processedProbe.receiveMessage()
+      processedA.envelope.persistenceId shouldBe pid.id
+      processedA.envelope.sequenceNr shouldBe 1L
+      processedA.envelope.event shouldBe "A"
+
+      val processedB = processedProbe.receiveMessage()
+      processedB.envelope.persistenceId shouldBe pid.id
+      processedB.envelope.sequenceNr shouldBe 2L
+      processedB.envelope.event shouldBe "B"
+
+      entity ! TestEntity.Persist("c")
+      val processedC = processedProbe.receiveMessage()
+      processedC.envelope.persistenceId shouldBe pid.id
+      processedC.envelope.sequenceNr shouldBe 3L
+      processedC.envelope.event shouldBe "C"
+
+      projection ! ProjectionBehavior.Stop
+      entity ! TestEntity.Stop(replyProbe.ref)
+      processedProbe.expectTerminated(projection)
+      processedProbe.expectTerminated(entity)
+    }
+
+    "filter out events" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b*")
+      entity ! TestEntity.Persist("c")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      // start the projection
+      val projection = spawnAtLeastOnceProjection()
+
+      val processedA = processedProbe.receiveMessage()
+      processedA.envelope.persistenceId shouldBe pid.id
+      processedA.envelope.sequenceNr shouldBe 1L
+      processedA.envelope.event shouldBe "A"
+
+      // b* is filtered out by the registered transformation
+
+      val processedC = processedProbe.receiveMessage()
+      processedC.envelope.persistenceId shouldBe pid.id
+      processedC.envelope.sequenceNr shouldBe 3L
+      processedC.envelope.event shouldBe "C"
+
+      projection ! ProjectionBehavior.Stop
+      entity ! TestEntity.Stop(replyProbe.ref)
+
+      processedProbe.expectTerminated(projection)
+      processedProbe.expectTerminated(entity)
+    }
+
+    "resume from offset" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      // start the projection
+      val projection = spawnExactlyOnceProjection()
+
+      processedProbe.receiveMessage().envelope.event shouldBe "A"
+      processedProbe.receiveMessage().envelope.event shouldBe "B"
+      processedProbe.expectNoMessage()
+
+      projection ! ProjectionBehavior.Stop
+      processedProbe.expectTerminated(projection)
+      // start new projection
+      val projection2 = spawnExactlyOnceProjection()
+
+      entity ! TestEntity.Persist("c")
+      processedProbe.receiveMessage().envelope.event shouldBe "C"
+
+      processedProbe.expectNoMessage()
+      projection2 ! ProjectionBehavior.Stop
+      entity ! TestEntity.Stop(replyProbe.ref)
+
+      processedProbe.expectTerminated(projection2)
+      processedProbe.expectTerminated(entity)
+    }
+
+    "deduplicate backtracking events" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b")
+      entity ! TestEntity.Persist("c")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      def expectedLogMessage(seqNr: Long): String =
+        s"Received backtracking event from [127.0.0.1] persistenceId 
[${pid.id}] with seqNr [$seqNr]"
+      val projection =
+        LoggingTestKit.trace(expectedLogMessage(1)).expect {
+          LoggingTestKit.trace(expectedLogMessage(2)).expect {
+            LoggingTestKit.trace(expectedLogMessage(3)).expect {
+              // start the projection
+              spawnExactlyOnceProjection()
+            }
+          }
+        }
+
+      processedProbe.receiveMessage().envelope.event shouldBe "A"
+      processedProbe.receiveMessage().envelope.event shouldBe "B"
+      processedProbe.receiveMessage().envelope.event shouldBe "C"
+
+      processedProbe.expectNoMessage()
+      projection ! ProjectionBehavior.Stop
+      entity ! TestEntity.Stop(replyProbe.ref)
+
+      processedProbe.expectTerminated(projection)
+      processedProbe.expectTerminated(entity)
+    }
+  }
+
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
new file mode 100644
index 0000000..d0e78ea
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestContainerConf.scala
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko.testkit.SocketUtil
+import org.testcontainers.containers.startupcheck.IsRunningStartupCheckStrategy
+import org.testcontainers.postgresql.PostgreSQLContainer
+
+class TestContainerConf {
+  val grpcPort: Int = SocketUtil.temporaryServerAddress("127.0.0.1").getPort
+
+  private val container = new PostgreSQLContainer("postgres:18.0")
+  container.withInitScript("db/default-init.sql")
+  container.withStartupCheckStrategy(new IsRunningStartupCheckStrategy)
+  container.withStartupAttempts(5)
+  container.start()
+
+  def config: Config =
+    ConfigFactory
+      .parseString(s"""
+     pekko.http.server.preview.enable-http2 = on
+     pekko.projection.grpc {
+       consumer.client {
+         host = "127.0.0.1"
+         port = $grpcPort
+         use-tls = false
+       }
+       producer {
+         query-plugin-id = "pekko.persistence.r2dbc.query"
+       }
+     }
+     pekko.persistence.r2dbc {
+       # yugabyte or postgres
+       dialect = "postgres"
+       connection-factory {
+         driver = "postgres"
+         host = "${container.getHost}"
+         port = ${container.getMappedPort(PostgreSQLContainer.POSTGRESQL_PORT)}
+         database = "${container.getDatabaseName}"
+         user = "${container.getUsername}"
+         password = "${container.getPassword}"
+       }
+     }
+     """)
+      .withFallback(ConfigFactory.load("persistence.conf"))
+
+  def stop(): Unit = container.stop()
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala
new file mode 100644
index 0000000..b1eeb89
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestData.scala
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import java.util.UUID
+import java.util.concurrent.atomic.AtomicLong
+
+import org.apache.pekko.persistence.typed.PersistenceId
+import org.apache.pekko.projection.ProjectionId
+
+object TestData {
+  private val start =
+    0L // could be something more unique, like currentTimeMillis
+  private val pidCounter = new AtomicLong(start)
+  private val entityTypeCounter = new AtomicLong(start)
+}
+
+trait TestData {
+  import TestData.entityTypeCounter
+  import TestData.pidCounter
+
+  def nextPid(entityType: String): PersistenceId =
+    PersistenceId(entityType, s"p-${pidCounter.incrementAndGet()}")
+
+  def nextEntityType() = s"TestEntity-${entityTypeCounter.incrementAndGet()}"
+
+  def randomProjectionId(): ProjectionId =
+    ProjectionId(UUID.randomUUID().toString, "00")
+
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
new file mode 100644
index 0000000..44884d6
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestDbLifecycle.scala
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import scala.concurrent.Await
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.actor.typed.ActorSystem
+import pekko.persistence.Persistence
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.Suite
+import org.slf4j.LoggerFactory
+
+trait TestDbLifecycle extends BeforeAndAfterAll { this: Suite =>
+
+  def typedSystem: ActorSystem[_]
+
+  def testConfigPath: String = "pekko.projection.r2dbc"
+
+  lazy val r2dbcProjectionSettings: R2dbcProjectionSettings =
+    
R2dbcProjectionSettings(typedSystem.settings.config.getConfig(testConfigPath))
+
+  lazy val r2dbcExecutor: R2dbcExecutor = {
+    new R2dbcExecutor(
+      
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
+      LoggerFactory.getLogger(getClass),
+      
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, 
typedSystem)
+  }
+
+  lazy val persistenceExt: Persistence = Persistence(typedSystem)
+
+  override protected def beforeAll(): Unit = {
+    lazy val journalSettings: JournalSettings =
+      
JournalSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
+    Await.result(
+      r2dbcExecutor.updateOne("beforeAll delete")(
+        _.createStatement(s"delete from 
${journalSettings.journalTableWithSchema}")),
+      10.seconds)
+    Await.result(
+      r2dbcExecutor.updateOne("beforeAll delete")(
+        _.createStatement(s"delete from 
${r2dbcProjectionSettings.timestampOffsetTableWithSchema}")),
+      10.seconds)
+    super.beforeAll()
+  }
+
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
new file mode 100644
index 0000000..964842f
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/TestEntity.scala
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2009-2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.actor.typed.scaladsl.LoggerOps
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+
+object TestEntity {
+  sealed trait Command
+  final case class Persist(payload: Any) extends Command
+  final case class Ping(replyTo: ActorRef[Done]) extends Command
+  final case class Stop(replyTo: ActorRef[Done]) extends Command
+
+  def apply(pid: PersistenceId): Behavior[Command] = {
+    Behaviors.setup { context =>
+      EventSourcedBehavior[Command, Any, String](
+        persistenceId = pid,
+        "",
+        { (_, command) =>
+          command match {
+            case command: Persist =>
+              context.log.debugN(
+                "Persist [{}], pid [{}], seqNr [{}]",
+                command.payload,
+                pid.id,
+                EventSourcedBehavior.lastSequenceNumber(context) + 1)
+              Effect.persist(command.payload)
+            case Ping(replyTo) =>
+              replyTo ! Done
+              Effect.none
+            case Stop(replyTo) =>
+              replyTo ! Done
+              Effect.stop()
+          }
+        },
+        (_, _) => "")
+    }
+  }
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
new file mode 100644
index 0000000..b03a7f4
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/EventTimestampQuerySpec.scala
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.consumer.scaladsl
+
+import java.time.Instant
+import java.time.{ Duration => JDuration }
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestData
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.TestEntity
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import 
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class EventTimestampQuerySpec(testContainerConf: TestContainerConf)
+    extends ScalaTestWithActorTestKit(testContainerConf.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with BeforeAndAfterAll
+    with LogCapturing {
+
+  def this() = this(new TestContainerConf)
+
+  protected override def afterAll(): Unit = {
+    super.afterAll()
+    testContainerConf.stop()
+  }
+
+  override def typedSystem: ActorSystem[_] = system
+  private implicit val ec: ExecutionContext = system.executionContext
+  private val entityType = nextEntityType()
+  private val streamId = "stream_id_" + entityType
+
+  class TestFixture {
+    val pid = nextPid(entityType)
+
+    val replyProbe = createTestProbe[Done]()
+
+    lazy val entity = spawn(TestEntity(pid))
+
+    lazy val grpcReadJournal = GrpcReadJournal(
+      GrpcQuerySettings(streamId),
+      
GrpcClientSettings.fromConfig(system.settings.config.getConfig("pekko.projection.grpc.consumer.client")),
+      protobufDescriptors = Nil)
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val eventProducerSource =
+      EventProducerSource(entityType, streamId, Transformation.identity, 
EventProducerSettings(system))
+
+    val eventProducerService =
+      EventProducer.grpcServiceHandler(eventProducerSource)
+
+    val service: HttpRequest => Future[HttpResponse] =
+      ServiceHandler.concatOrNotFound(eventProducerService)
+
+    val bound =
+      Http()
+        .newServerAt("127.0.0.1", testContainerConf.grpcPort)
+        .bind(service)
+        .map(_.addToCoordinatedShutdown(3.seconds))
+
+    bound.futureValue
+  }
+
+  "GrpcReadJournal with EventTimestampQuery" must {
+    "lookup event timestamp" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      val timestampA =
+        grpcReadJournal.timestampOf(pid.id, sequenceNr = 1L).futureValue.get
+      JDuration
+        .between(timestampA, Instant.now())
+        .toMillis should (be >= 0L and be <= 3000L)
+
+      val timestampB =
+        grpcReadJournal.timestampOf(pid.id, sequenceNr = 2L).futureValue.get
+      JDuration
+        .between(timestampB, Instant.now())
+        .toMillis should (be >= 0L and be <= 3000L)
+
+      if (timestampB != timestampA)
+        timestampB.isAfter(timestampA) shouldBe true
+    }
+
+    "handle missing event as None" in new TestFixture {
+      grpcReadJournal
+        .timestampOf(pid.id, sequenceNr = 13L)
+        .futureValue
+        .isEmpty shouldBe true
+    }
+  }
+
+}
diff --git 
a/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
new file mode 100644
index 0000000..0216507
--- /dev/null
+++ 
b/grpc-int-test/src/test/scala/org/apache/pekko/projection/grpc/consumer/scaladsl/LoadEventQuerySpec.scala
@@ -0,0 +1,149 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.projection.grpc.consumer.scaladsl
+
+import scala.concurrent.Await
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+import org.apache.pekko
+import pekko.Done
+import pekko.NotUsed
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorSystem
+import pekko.grpc.GrpcClientSettings
+import pekko.grpc.scaladsl.ServiceHandler
+import pekko.http.scaladsl.Http
+import pekko.http.scaladsl.model.HttpRequest
+import pekko.http.scaladsl.model.HttpResponse
+import pekko.projection.grpc.TestContainerConf
+import pekko.projection.grpc.TestDbLifecycle
+import pekko.projection.grpc.TestEntity
+import pekko.projection.grpc.TestData
+import pekko.projection.grpc.consumer.GrpcQuerySettings
+import pekko.projection.grpc.producer.EventProducerSettings
+import pekko.projection.grpc.producer.scaladsl.EventProducer
+import 
pekko.projection.grpc.producer.scaladsl.EventProducer.EventProducerSource
+import pekko.projection.grpc.producer.scaladsl.EventProducer.Transformation
+import io.grpc.Status
+import io.grpc.StatusRuntimeException
+import org.scalatest.BeforeAndAfterAll
+import org.scalatest.wordspec.AnyWordSpecLike
+
+class LoadEventQuerySpec(testContainerConf: TestContainerConf)
+    extends ScalaTestWithActorTestKit(testContainerConf.config)
+    with AnyWordSpecLike
+    with TestDbLifecycle
+    with TestData
+    with BeforeAndAfterAll
+    with LogCapturing {
+
+  def this() = this(new TestContainerConf)
+
+  override def typedSystem: ActorSystem[_] = system
+  private implicit val ec: ExecutionContext = system.executionContext
+  private val entityType = nextEntityType()
+  private val streamId = "stream_id_" + entityType
+
+  protected override def afterAll(): Unit = {
+    super.afterAll()
+    testContainerConf.stop()
+  }
+
+  class TestFixture {
+
+    val replyProbe = createTestProbe[Done]()
+    val pid = nextPid(entityType)
+
+    lazy val entity = spawn(TestEntity(pid))
+
+    lazy val grpcReadJournal = GrpcReadJournal(
+      GrpcQuerySettings(streamId),
+      GrpcClientSettings
+        .connectToServiceAt("127.0.0.1", testContainerConf.grpcPort)
+        .withTls(false),
+      protobufDescriptors = Nil)
+  }
+
+  override protected def beforeAll(): Unit = {
+    super.beforeAll()
+
+    val transformation =
+      Transformation.empty.registerAsyncMapper((event: String) => {
+        if (event.contains("*"))
+          Future.successful(None)
+        else
+          Future.successful(Some(event.toUpperCase))
+      })
+
+    val eventProducerSource = EventProducerSource(entityType, streamId, 
transformation, EventProducerSettings(system))
+
+    val eventProducerService =
+      EventProducer.grpcServiceHandler(eventProducerSource)
+
+    val service: HttpRequest => Future[HttpResponse] =
+      ServiceHandler.concatOrNotFound(eventProducerService)
+
+    val bound =
+      Http()
+        .newServerAt("127.0.0.1", testContainerConf.grpcPort)
+        .bind(service)
+        .map(_.addToCoordinatedShutdown(3.seconds))
+
+    bound.futureValue
+  }
+
+  "GrpcReadJournal with LoadEventQuery" must {
+    "load event" in new TestFixture {
+      entity ! TestEntity.Persist("a")
+      entity ! TestEntity.Persist("b")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      grpcReadJournal
+        .loadEnvelope[String](pid.id, sequenceNr = 1L)
+        .futureValue
+        .event shouldBe "A"
+
+      grpcReadJournal
+        .loadEnvelope[String](pid.id, sequenceNr = 2L)
+        .futureValue
+        .event shouldBe "B"
+    }
+
+    "load filtered event" in new TestFixture {
+      entity ! TestEntity.Persist("a*")
+      entity ! TestEntity.Ping(replyProbe.ref)
+      replyProbe.receiveMessage()
+
+      val env = grpcReadJournal
+        .loadEnvelope[String](pid.id, sequenceNr = 1L)
+        .futureValue
+      env.eventOption.isEmpty shouldBe true
+      env.eventMetadata shouldBe Some(NotUsed)
+    }
+
+    "handle missing event as NOT_FOUND" in new TestFixture {
+      val status =
+        intercept[StatusRuntimeException] {
+          Await.result(grpcReadJournal.loadEnvelope[String](pid.id, sequenceNr 
= 13L), replyProbe.remainingOrDefault)
+          fail("Expected NOT_FOUND")
+        }.getStatus
+      status.getCode shouldBe Status.NOT_FOUND.getCode
+    }
+  }
+
+}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 5c62403..f283691 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -26,6 +26,7 @@ object Dependencies {
     val pekko = PekkoCoreDependency.version
     val pekkoGrpc = org.apache.pekko.grpc.gen.BuildInfo.version
     val pekkoPersistenceJdbc = "2.0.0-M1"
+    val pekkoPersistenceR2dbc = "2.0.0-M0+109-880ba593-SNAPSHOT"
     val pekkoPersistenceCassandra = "1.1.0"
     val connectors = PekkoConnectorsDependency.version
     val connectorsKafka = PekkoConnectorsKafkaDependency.version
@@ -212,6 +213,18 @@ object Dependencies {
       Test.logback % "test",
       Test.scalatest % "test")
 
+  val grpcIntTest =
+    deps ++= Seq(
+      "org.apache.pekko" %% "pekko-persistence-r2dbc" % 
Versions.pekkoPersistenceR2dbc % "test",
+      "org.apache.pekko" %% "pekko-projection-r2dbc" % 
Versions.pekkoPersistenceR2dbc % "test",
+      "org.postgresql" % "r2dbc-postgresql" % "1.1.1.RELEASE" % "test",
+      Test.postgresDriver % "test",
+      Test.pekkoSerializationJackson,
+      Test.pekkoTypedTestkit % "test",
+      Test.postgresContainer % "test",
+      Test.logback % "test",
+      Test.scalatest % "test")
+
   val kafkaTest =
     deps ++= Seq(
       Test.scalatest % "test",


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to