This is an automated email from the ASF dual-hosted git repository.

pjfanning pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-projection.git


The following commit(s) were added to refs/heads/main by this push:
     new 9f5a4ec  add r2dbc docs (#489)
9f5a4ec is described below

commit 9f5a4ec0edeefd89a0944630a7dc649d20a31357
Author: PJ Fanning <[email protected]>
AuthorDate: Sat May 23 11:34:08 2026 +0100

    add r2dbc docs (#489)
    
    * r2dbc docs
    
    * Update build.sbt
    
    * Update R2dbcProjectionDocExample.java
    
    * Update R2dbcProjectionDocExample.scala
    
    * Update R2dbcProjectionDocExample.scala
    
    * Update R2dbcProjectionDocExample.scala
---
 .github/workflows/integration-tests-r2dbc.yml      |   6 +-
 build.sbt                                          |   7 +
 docs/src/main/paradox/index.md                     |   3 +-
 docs/src/main/paradox/r2dbc.md                     | 275 ++++++++++++++++++++
 project/Dependencies.scala                         | 200 +++++++--------
 .../ddl-scripts}/create_tables_postgres.sql        |   0
 .../ddl-scripts/create_tables_yugabyte.sql         |  17 +-
 .../home/projection/R2dbcProjectionDocExample.java | 283 +++++++++++++++++++++
 .../test/scala/docs/home/CborSerializable.scala    |  16 ++
 .../projection/R2dbcProjectionDocExample.scala     | 274 ++++++++++++++++++++
 10 files changed, 967 insertions(+), 114 deletions(-)

diff --git a/.github/workflows/integration-tests-r2dbc.yml 
b/.github/workflows/integration-tests-r2dbc.yml
index 3249015..5d98546 100644
--- a/.github/workflows/integration-tests-r2dbc.yml
+++ b/.github/workflows/integration-tests-r2dbc.yml
@@ -58,11 +58,11 @@ jobs:
           docker compose -f docker/docker-compose-postgres.yml up -d
           # TODO: could we poll the port instead of sleep?
           sleep 10
-          docker exec -i docker-postgres-db-1 psql -U postgres -t < 
ddl-scripts/create_tables_postgres.sql
+          docker exec -i docker-postgres-db-1 psql -U postgres -t < 
r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
           docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE 
DATABASE database1;'
-          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1 
< ddl-scripts/create_tables_postgres.sql
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1 
< r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
           docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE 
DATABASE database2;'
-          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2 
< ddl-scripts/create_tables_postgres.sql
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2 
< r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
 
       - name: Run all integration tests with default Scala and Java ${{ 
matrix.java-version }}
         run: sbt "r2dbc-int-test/test" ${{ matrix.extraOpts }}
diff --git a/build.sbt b/build.sbt
index f5f18cf..9d5e938 100644
--- a/build.sbt
+++ b/build.sbt
@@ -203,6 +203,7 @@ lazy val grpcTest =
       Test / fork := true,
       Test / javaOptions += 
"--add-opens=java.base/jdk.internal.misc=ALL-UNNAMED")
     .dependsOn(grpc % "compile;test->compile")
+    .dependsOn(r2dbc % Test)
     .dependsOn(testkit % Test)
 
 lazy val grpcIntTest =
@@ -297,6 +298,12 @@ lazy val docs = project
       "scaladoc.pekko.base_url" -> 
s"https://pekko.apache.org/api/pekko/${Dependencies.PekkoVersionInDocs}/";,
       "javadoc.pekko.base_url" -> 
s"https://pekko.apache.org/japi/pekko/${Dependencies.PekkoVersionInDocs}/";,
       "javadoc.pekko.link_style" -> "direct",
+      "extref.pekko-persistence-r2dbc.base_url" ->
+      
s"https://pekko.apache.org/docs/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}/%s";,
+      "scaladoc.org.apache.pekko.persistence.r2dbc.base_url" ->
+      
s"https://pekko.apache.org/api/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}";,
+      "javadoc.org.apache.pekko.persistence.r2dbc.base_url" ->
+      
s"https://pekko.apache.org/japi/pekko-persistence-r2dbc/${Dependencies.PekkoPersistenceR2dbcVersionInDocs}";,
       // Java
       "javadoc.base_url" -> "https://docs.oracle.com/javase/8/docs/api/";,
       // Scala
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index c94425e..6c07a76 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -10,7 +10,7 @@
 * [Event Sourced](eventsourced.md)
 * [Durable State](durable-state.md)
 * [Kafka](kafka.md)
-* [gRPC](grpc.md)
+* [R2DBC](r2dbc.md)
 * [Cassandra](cassandra.md)
 * [JDBC](jdbc.md)
 * [Slick](slick.md)
@@ -19,6 +19,7 @@
 * [Flow](flow.md)
 * [Error](error.md)
 * [Projection Settings](projection-settings.md)
+* [gRPC](grpc.md)
 * [Management](management.md)
 * [Testing](testing.md)
 * [Classic](classic.md)
diff --git a/docs/src/main/paradox/r2dbc.md b/docs/src/main/paradox/r2dbc.md
new file mode 100644
index 0000000..6cc2f85
--- /dev/null
+++ b/docs/src/main/paradox/r2dbc.md
@@ -0,0 +1,275 @@
+# Offset in a relational DB with R2DBC
+
+The @apidoc[R2dbcProjection$] has support for storing the offset in a 
relational database using R2DBC via @extref:[Apache Pekko Persistence 
R2DBC](pekko-persistence-r2dbc:overview.html).
+
+The source of the envelopes is from a `SourceProvider`, which can be:
+
+* events from Event Sourced entities via the @ref:[SourceProvider for 
eventsBySlices](eventsourced.md#sourceprovider-for-eventsbyslices) with the 
@extref:[eventsBySlices 
query](pekko-persistence-r2dbc:query.html#eventsbyslices)
+* state changes for Durable State entities via the @ref:[SourceProvider for 
changesBySlices](durable-state.md#sourceprovider-for-changesbyslices) with the 
@extref:[changesBySlices 
query](pekko-persistence-r2dbc:query.html#changesbyslices)
+* any other `SourceProvider` with supported @ref:[offset types](#offset-types)
+
+A @apidoc[R2dbcHandler] receives a @apidoc[pekko.projection.*.R2dbcSession] 
instance and an envelope. The
+`R2dbcSession` provides the means to access an open R2DBC connection that can 
be used to process the envelope.
+The target database operations can be run in the same transaction as the 
storage of the offset, which means
+that @ref:[exactly-once](#exactly-once) processing semantics is supported. It 
also offers
+@ref:[at-least-once](#at-least-once) semantics.
+
+## Dependencies
+
+To use the R2DBC module of Pekko Projections add the following dependency in 
your project:
+
+@@dependency [Maven,sbt,Gradle] {
+group=org.apache.pekko
+artifact=pekko-projection-r2dbc_$scala.binary.version$
+version=$project.version$
+group2=org.apache.pekko
+artifact2=pekko-persistence-r2dbc_$scala.binary.version$
+version2=$pekko.r2dbc.version$
+}
+
+Pekko Projections R2DBC depends on Pekko $pekko.version$ or later, and note 
that it is important that all `pekko-*`
+dependencies are in the same version, so it is recommended to depend on them 
explicitly to avoid problems
+with transient dependencies causing an unlucky mix of versions.
+
+@@project-info{ projectId="r2dbc" }
+
+
+### Transitive dependencies
+
+The table below shows `pekko-projection-r2dbc`'s direct dependencies, and the 
second tab shows all libraries it depends on transitively.
+
+@@dependencies{ projectId="r2dbc" }
+
+## Schema
+
+The `projection_offset_store`, `projection_timestamp_offset_store` and 
`projection_management` tables
+need to be created in the configured database:
+
+PostgreSQL
+:  @@snip [PostgreSQL 
Schema](/r2dbc-int-test/ddl-scripts/create_tables_postgres.sql)
+
+YugaByte
+:  @@snip [YugaByte 
Schema](/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql)
+
+## Configuration
+
+By default, `pekko-projection-r2dbc` uses the same connection pool and 
`dialect` as `pekko-persistence-r2dbc`, see
+@extref:[Connection 
configuration](pekko-persistence-r2dbc:config.html#connection-configuration).
+
+### Reference configuration
+
+The following can be overridden in your `application.conf` for the Projection 
specific settings:
+
+@@snip [reference.conf](/r2dbc/src/main/resources/reference.conf) 
{#projection-config}
+
+## Running with Sharded Daemon Process
+
+The Sharded Daemon Process can be used to distribute `n` instances of a given 
Projection across the cluster.
+Therefore, it's important that each Projection instance consumes a subset of 
the stream of envelopes.
+
+When using `eventsBySlices` the initialization code looks like this:
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #initProjections }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #initProjections }
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+It is possible to dynamically scale the number of Projection instances as 
described in @extref:[Sharded Daemon Process 
documentation](pekko:typed/cluster-sharded-daemon-process.html#dynamic-scaling-of-number-of-workers).
 
+
+There are alternative ways of running the `ProjectionBehavior` as described in 
@ref:[Running a Projection](running.md), but note that when using the R2DBC 
plugin as `SourceProvider` it is recommended to use `eventsBySlices` and not 
`eventsByTag`.
+
+## Slices
+
+The `SourceProvider` for Event Sourced actors has historically been using 
`eventsByTag` but the R2DBC plugin is
+instead providing `eventsBySlices` as an improved solution.
+
+The usage of `eventsByTag` for Projections has the drawback that the number of 
tags must be decided
+up-front and can't easily be changed afterwards. Starting with too many tags 
means much overhead since
+many projection instances would be running on each node in a small Pekko 
Cluster. Each projection instance
+polling the database periodically. Starting with too few tags means that it 
can't be scaled later to more
+Pekko nodes.
+
+With `eventsBySlices` more Projection instances can be added when needed and 
still reuse the offsets
+for the previous slice distributions.
+
+A slice is deterministically defined based on the persistence id. The purpose 
is to evenly distribute all
+persistence ids over the slices. The `eventsBySlices` query is for a range of 
the slices. For example if
+using 1024 slices and running 4 Projection instances the slice ranges would be 
0-255, 256-511, 512-767, 768-1023.
+Changing to 8 slice ranges means that the ranges would be 0-127, 128-255, 
256-383, ..., 768-895, 896-1023.
+
+However, when changing the number of slices the projections with the old slice 
distribution must be
+stopped before starting new projections. That can be done with a full shutdown 
before deploying the
+new slice distribution or pause (stop) the projections with @ref:[the 
management API](management.md).
+
+When using `R2dbcProjection` together with the 
`EventSourcedProvider.eventsBySlices` the events will be delivered in
+sequence number order without duplicates.
+
+When using `R2dbcProjection` together with 
`DurableStateSourceProvider.changesBySlices` the changes will be delivered
+in revision number order without duplicates.
+
+## exactly-once
+
+The offset is stored in the same transaction used for the user defined 
`handler`, which means exactly-once
+processing semantics if the projection is restarted from previously stored 
offset.
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #exactlyOnce }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #exactlyOnce }
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+## at-least-once
+
+The offset is stored after the envelope has been processed and giving 
at-least-once processing semantics.
+This means that if the projection is restarted from a previously stored offset 
some elements may be processed more
+than once. Therefore, the @ref:[Handler](#handler) code must be idempotent.
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #atLeastOnce }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #atLeastOnce }
+
+The offset is stored after a time window, or limited by a number of envelopes, 
whatever happens first.
+This window can be defined with `withSaveOffset` of the returned 
`AtLeastOnceProjection`.
+The default settings for the window is defined in configuration section 
`pekko.projection.at-least-once`.
+There is a performance benefit of not storing the offset too often, but the 
drawback is that there can be more
+duplicates when the projection that will be processed again when the 
projection is restarted.
+
+The @ref:[`ShoppingCartHandler` is shown below](#handler).
+
+## groupedWithin
+
+The envelopes can be grouped before processing, which can be useful for batch 
updates.
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #grouped }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #grouped }
+
+The envelopes are grouped within a time window, or limited by a number of 
envelopes, whatever happens first.
+This window can be defined with `withGroup` of the returned 
`GroupedProjection`. The default settings for
+the window is defined in configuration section `pekko.projection.grouped`.
+
+When using `groupedWithin` the handler is a 
@scala[`R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]]`]@java[`R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>>`].
+The @ref:[`GroupedShoppingCartHandler` is shown below](#grouped-handler).
+
+The offset is stored in the same transaction used for the user defined 
`handler`, which means exactly-once
+processing semantics if the projection is restarted from previously stored 
offset.
+
+## Handler
+
+It's in the @apidoc[R2dbcHandler] that you implement the processing of each 
envelope. It's essentially a consumer function
+from `(R2dbcSession, Envelope)` to 
@scala[`Future[Done]`]@java[`CompletionStage<Done>`].
+
+A handler that is consuming `ShoppingCart.Event` from `eventsBySlices` can 
look like this:
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #handler }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #handler }
+
+@@@ note { title=Hint }
+Such simple handlers can also be defined as plain functions via the helper 
@scala[`R2dbcHandler.apply`]@java[`R2dbcHandler.fromFunction`] factory method.
+@@@
+
+### Grouped handler
+
+When using @ref:[`R2dbcProjection.groupedWithin`](#groupedwithin) the handler 
is processing a @scala[`Seq`]@java[`List`] of envelopes.
+
+Scala
+:  @@snip 
[R2dbcProjectionDocExample.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala)
 { #grouped-handler }
+
+Java
+:  @@snip 
[R2dbcProjectionDocExample.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java)
 { #grouped-handler }
+
+### Stateful handler
+
+The @apidoc[R2dbcHandler] can be stateful, with variables and mutable data 
structures. It is invoked by the `Projection` machinery
+one envelope at a time and visibility guarantees between the invocations are 
handled automatically, i.e. no volatile
+or other concurrency primitives are needed for managing the state as long as 
it's not accessed by other threads
+than the one that called `process`.
+
+@@@ note
+
+It is important that the `Handler` instance is not shared between several 
`Projection` instances,
+because then it would be invoked concurrently, which is not how it is intended 
to be used. Each `Projection`
+instance should use a new `Handler` instance.
+
+@@@
+
+### Async handler
+
+The @apidoc[Handler] can be used with `R2dbcProjection.atLeastOnceAsync` and
+`R2dbcProjection.groupedWithinAsync` if the handler is not storing the 
projection result in the database.
+The handler could send to a Kafka topic or integrate with something else.
+
+There are several examples of such `Handler` in the @ref:[documentation for 
Cassandra Projections](cassandra.md#handler).
+Same type of handlers can be used with `R2dbcProjection` instead of 
`CassandraProjection`.
+
+### Actor handler
+
+A good alternative for advanced state management is to implement the handler 
as an
+@extref:[actor](pekko:typed/typed/actors.html) which is described in
+@ref:[Processing with Actor](actor.md).
+
+### Flow handler
+
+A Pekko Streams `FlowWithContext` can be used instead of a handler for 
processing the envelopes,
+which is described in @ref:[Processing with Pekko Streams](flow.md).
+
+### Handler lifecycle
+
+You can override the `start` and `stop` methods of the `R2dbcHandler` to 
implement initialization
+before first envelope is processed and resource cleanup when the projection is 
stopped.
+Those methods are also called when the `Projection` is restarted after failure.
+
+See also @ref:[error handling](error.md).
+
+## Offset types
+
+The supported offset types of the `R2dbcProjection` are:
+
+* @apidoc[pekko.persistence.query.TimestampOffset] that is used for 
@ref:[SourceProvider for 
eventsBySlices](eventsourced.md#sourceprovider-for-eventsbyslices) and 
@ref:[SourceProvider for 
changesBySlices](durable-state.md#sourceprovider-for-changesbyslices)
+* other @apidoc[pekko.persistence.query.Offset] types
+* @apidoc[MergeableOffset] that is used for @ref:[messages from 
Kafka](kafka.md#mergeable-offset)
+* `String`
+* `Int`
+* `Long`
+* Any other type that has a configured Pekko Serializer is stored with base64 
encoding of the serialized bytes.
+
+## Publish events for lower latency
+
+See @extref:[eventsBySlices 
documentation](pekko-persistence-r2dbc:query.html#publish-events-for-lower-latency-of-eventsbyslices).
+
+
+## Multiple plugins
+
+Just like how multiple plugins can be configured as described for @extref[the 
R2DBC persistence plugin](pekko-persistence-r2dbc:config.html#multiple-plugins) 
multiple projection configurations are possible.
+
+For Projection offset store you need another config section:
+
+@@snip 
[conf](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#second-projection-config}
+
+Note that the `use-connection-factory` property references the same connection 
settings as is used for the `second-r2dbc` plugins, but it could also
+have been a separate connection pool configured as:
+
+@@snip 
[conf](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#second-projection-config-with-connection-factory}
+
+In that way you can use the default plugins for the write side and Projection 
`SourceProvider`, but use a separate database for the Projection
+handlers and offset storage.
+
+You start the Projections with the `ProjectionSettings` loaded from 
`"second-projection-r2dbc"`.
+
+Scala
+:  @@snip 
[Example.scala](/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala){#projectionSettings}
+
+Java
+:  @@snip 
[Example.java](/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java){#projectionSettings}
diff --git a/project/Dependencies.scala b/project/Dependencies.scala
index 970e302..cfa1d16 100644
--- a/project/Dependencies.scala
+++ b/project/Dependencies.scala
@@ -19,6 +19,8 @@ object Dependencies {
   val ScalaVersions = Seq(Scala213, Scala3)
 
   val PekkoVersionInDocs = PekkoCoreDependency.default.link
+  // change PekkoPersistenceR2dbcVersionInDocs when 2.0.0-M1 is released
+  val PekkoPersistenceR2dbcVersionInDocs = "current"
   val ConnectorsVersionInDocs = PekkoConnectorsDependency.default.link
   val ConnectorsKafkaVersionInDocs = 
PekkoConnectorsKafkaDependency.default.link
 
@@ -47,6 +49,7 @@ object Dependencies {
     val pekkoPersistenceQuery = "org.apache.pekko" %% 
"pekko-persistence-query" % Versions.pekko
     val pekkoPersistenceTyped = "org.apache.pekko" %% 
"pekko-persistence-typed" % Versions.pekko
     val pekkoGrpcRuntime = "org.apache.pekko" %% "pekko-grpc-runtime" % 
Versions.pekkoGrpc
+    val pekkoPersistenceR2dbc = "org.apache.pekko" %% 
"pekko-persistence-r2dbc" % Versions.pekkoPersistenceR2dbc
 
     // TestKit in compile scope for ProjectionTestKit
     val pekkoTypedTestkit = "org.apache.pekko" %% "pekko-actor-testkit-typed" 
% Versions.pekko
@@ -61,65 +64,56 @@ object Dependencies {
     // must be provided on classpath when using Apache Kafka 2.6.0+
     val jackson = "com.fasterxml.jackson.core" % "jackson-databind" % 
Versions.jackson
 
-    // not really used in lib code, but in example and test
-    val h2Driver = "com.h2database" % "h2" % Versions.h2Driver
-
     val r2dbcSpi = "io.r2dbc" % "r2dbc-spi" % "1.0.0.RELEASE"
     val r2dbcPool = "io.r2dbc" % "r2dbc-pool" % "1.0.2.RELEASE"
     val r2dbcPostgres = "org.postgresql" % "r2dbc-postgresql" % "1.1.1.RELEASE"
     val r2dbcMysql = "io.asyncer" % "r2dbc-mysql" % "1.4.2"
   }
 
-  object TestNonIt {
-    val persistenceTestkit = "org.apache.pekko" %% "pekko-persistence-testkit" 
% Versions.pekko % "test"
-
-    val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest % 
"test"
-
-    val logback = "ch.qos.logback" % "logback-classic" % Versions.logback % 
"test"
-  }
-
   object Test {
+    val pekkoClusterShardingTyped = Compile.pekkoClusterShardingTyped % "test"
     val pekkoDiscovery = "org.apache.pekko" %% "pekko-discovery" % 
Versions.pekko % "test"
     val pekkoDistributedData = "org.apache.pekko" %% "pekko-distributed-data" 
% Versions.pekko % "test"
     val pekkoSerializationJackson = "org.apache.pekko" %% 
"pekko-serialization-jackson" % Versions.pekko % "test"
-    val pekkoTypedTestkit = Compile.pekkoTypedTestkit
-    val pekkoStreamTestkit = Compile.pekkoStreamTestkit
+    val pekkoTypedTestkit = Compile.pekkoTypedTestkit % "test"
+    val pekkoStreamTestkit = Compile.pekkoStreamTestkit % "test"
     val persistenceTestkit = "org.apache.pekko" %% "pekko-persistence-testkit" 
% Versions.pekko % "test"
 
-    val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest
+    val scalatest = "org.scalatest" %% "scalatest" % Versions.scalaTest % 
"test"
     val scalatestJUnit = "org.scalatestplus" %% "junit-4-13" % 
(Versions.scalaTest + ".0")
-    val junit = "junit" % "junit" % Versions.junit
+    val junit = "junit" % "junit" % Versions.junit % "test"
 
-    val h2Driver = Compile.h2Driver
-    val postgresDriver = "org.postgresql" % "postgresql" % "42.7.11"
-    val mysqlDriver = "com.mysql" % "mysql-connector-j" % "9.7.0"
-    val msSQLServerDriver = "com.microsoft.sqlserver" % "mssql-jdbc" % 
"13.4.0.jre11"
-    val oracleDriver = "com.oracle.ojdbc" % "ojdbc8" % "19.3.0.0"
+    val h2Driver = "com.h2database" % "h2" % Versions.h2Driver % "test"
+    val postgresDriver = "org.postgresql" % "postgresql" % "42.7.11" % "test"
+    val mysqlDriver = "com.mysql" % "mysql-connector-j" % "9.7.0" % "test"
+    val msSQLServerDriver = "com.microsoft.sqlserver" % "mssql-jdbc" % 
"13.4.0.jre11" % "test"
+    val oracleDriver = "com.oracle.ojdbc" % "ojdbc8" % "19.3.0.0" % "test"
 
-    val logback = "ch.qos.logback" % "logback-classic" % Versions.logback
+    val logback = "ch.qos.logback" % "logback-classic" % Versions.logback % 
"test"
 
     val cassandraContainer =
-      "org.testcontainers" % "testcontainers-cassandra" % 
Versions.testContainers
+      "org.testcontainers" % "testcontainers-cassandra" % 
Versions.testContainers % "test"
     val postgresContainer =
-      "org.testcontainers" % "testcontainers-postgresql" % 
Versions.testContainers
+      "org.testcontainers" % "testcontainers-postgresql" % 
Versions.testContainers % "test"
     val mysqlContainer =
-      "org.testcontainers" % "testcontainers-mysql" % Versions.testContainers
+      "org.testcontainers" % "testcontainers-mysql" % Versions.testContainers 
% "test"
     val msSQLServerContainer =
-      "org.testcontainers" % "testcontainers-mssqlserver" % 
Versions.testContainers
+      "org.testcontainers" % "testcontainers-mssqlserver" % 
Versions.testContainers % "test"
 
     val oracleDbContainer =
-      "org.testcontainers" % "testcontainers-oracle-xe" % 
Versions.testContainers
+      "org.testcontainers" % "testcontainers-oracle-xe" % 
Versions.testContainers % "test"
 
     val connectorsKafkaTestkit =
       "org.apache.pekko" %% "pekko-connectors-kafka-testkit" % 
Versions.connectorsKafka
 
     val r2dbcPostgres = Compile.r2dbcPostgres % "test"
+    val r2dbcMysql = Compile.r2dbcMysql % "test"
   }
 
   object Examples {
     val hibernate = "org.hibernate" % "hibernate-core" % "7.3.5.Final"
 
-    val pekkoClusterShardingTyped = "org.apache.pekko" %% 
"pekko-cluster-sharding-typed" % Versions.pekko
+    val pekkoClusterShardingTyped = Compile.pekkoClusterShardingTyped
     val pekkoPersistenceCassandra =
       "org.apache.pekko" %% "pekko-persistence-cassandra" % 
Versions.pekkoPersistenceCassandra
     val pekkoPersistenceJdbc = "org.apache.pekko" %% "pekko-persistence-jdbc" 
% Versions.pekkoPersistenceJdbc
@@ -136,30 +130,30 @@ object Dependencies {
       // pekko-persistence-query is only needed for OffsetSerialization and to 
provide a typed EventEnvelope that
       // references the Offset type from pekko-persistence.
       Compile.pekkoPersistenceQuery,
-      Test.pekkoTypedTestkit % "test",
-      Test.logback % "test",
-      Test.scalatest % "test")
+      Test.pekkoTypedTestkit,
+      Test.logback,
+      Test.scalatest)
 
   val coreTest =
     deps ++= Seq(
-      Test.pekkoTypedTestkit % "test",
-      Test.pekkoStreamTestkit % "test",
-      Test.scalatest % "test",
-      Test.scalatestJUnit % "test",
-      Test.junit % "test",
-      Test.logback % "test")
+      Test.pekkoTypedTestkit,
+      Test.pekkoStreamTestkit,
+      Test.scalatest,
+      Test.scalatestJUnit,
+      Test.junit,
+      Test.logback)
 
   val testKit =
     deps ++= Seq(
       Compile.pekkoTypedTestkit,
       Compile.pekkoStreamTestkit,
-      Test.scalatest % "test",
-      Test.scalatestJUnit % "test",
-      Test.junit % "test",
-      Test.logback % "test")
+      Test.scalatest,
+      Test.scalatestJUnit,
+      Test.junit,
+      Test.logback)
 
   val eventsourced =
-    deps ++= Seq(Compile.pekkoPersistenceQuery, TestNonIt.persistenceTestkit, 
TestNonIt.scalatest, TestNonIt.logback)
+    deps ++= Seq(Compile.pekkoPersistenceQuery, Test.persistenceTestkit, 
Test.scalatest, Test.logback)
 
   val state =
     deps ++= Seq(Compile.pekkoPersistenceQuery, Test.persistenceTestkit, 
Test.pekkoStreamTestkit, Test.scalatest)
@@ -167,50 +161,50 @@ object Dependencies {
   val jdbc =
     deps ++= Seq(
       Compile.pekkoPersistenceQuery,
-      Test.pekkoTypedTestkit % "test",
-      Test.h2Driver % "test",
-      Test.postgresDriver % "test",
-      Test.postgresContainer % "test",
-      Test.mysqlDriver % "test",
-      Test.mysqlContainer % "test",
-      Test.msSQLServerDriver % "test",
-      Test.msSQLServerContainer % "test",
-      Test.oracleDriver % "test",
-      Test.oracleDbContainer % "test",
-      Test.logback % "test")
+      Test.pekkoTypedTestkit,
+      Test.h2Driver,
+      Test.postgresDriver,
+      Test.postgresContainer,
+      Test.mysqlDriver,
+      Test.mysqlContainer,
+      Test.msSQLServerDriver,
+      Test.msSQLServerContainer,
+      Test.oracleDriver,
+      Test.oracleDbContainer,
+      Test.logback)
 
   val slick =
     deps ++= Seq(
       Compile.slick,
       Compile.pekkoPersistenceQuery,
-      Test.pekkoTypedTestkit % "test",
-      Test.h2Driver % "test",
-      Test.postgresDriver % "test",
-      Test.postgresContainer % "test",
-      Test.mysqlDriver % "test",
-      Test.mysqlContainer % "test",
-      Test.msSQLServerDriver % "test",
-      Test.msSQLServerContainer % "test",
-      Test.oracleDriver % "test",
-      Test.oracleDbContainer % "test",
-      Test.logback % "test")
+      Test.pekkoTypedTestkit,
+      Test.h2Driver,
+      Test.postgresDriver,
+      Test.postgresContainer,
+      Test.mysqlDriver,
+      Test.mysqlContainer,
+      Test.msSQLServerDriver,
+      Test.msSQLServerContainer,
+      Test.oracleDriver,
+      Test.oracleDbContainer,
+      Test.logback)
 
   val cassandra =
     deps ++= Seq(
       Compile.connectorsCassandra,
       Compile.pekkoPersistenceQuery,
-      Test.pekkoTypedTestkit % "test",
-      Test.logback % "test",
-      Test.cassandraContainer % "test",
-      Test.scalatest % "test",
-      Test.scalatestJUnit % "test")
+      Test.pekkoTypedTestkit,
+      Test.logback,
+      Test.cassandraContainer,
+      Test.scalatest,
+      Test.scalatestJUnit)
 
   val kafka =
     deps ++= Seq(
       Compile.connectorsKafka,
       Compile.jackson,
-      Test.scalatest % "test",
-      Test.logback % "test")
+      Test.scalatest,
+      Test.logback)
 
   val grpc =
     deps ++= Seq(
@@ -225,62 +219,62 @@ object Dependencies {
 
   val grpcTest =
     deps ++= Seq(
-      "org.apache.pekko" %% "pekko-projection-r2dbc" % 
Versions.pekkoPersistenceR2dbc % "test",
-      Test.postgresDriver % "test",
-      Compile.pekkoClusterShardingTyped % "test",
+      Test.postgresDriver,
+      Test.pekkoClusterShardingTyped,
       Test.pekkoSerializationJackson,
       Test.pekkoDiscovery,
-      Test.pekkoTypedTestkit % "test",
-      Test.pekkoStreamTestkit % "test",
-      Test.postgresContainer % "test",
-      Test.logback % "test",
-      Test.scalatest % "test")
+      Test.pekkoTypedTestkit,
+      Test.pekkoStreamTestkit,
+      Test.postgresContainer,
+      Test.logback,
+      Test.scalatest)
 
   val grpcIntTest =
     deps ++= Seq(
-      Compile.pekkoClusterShardingTyped % "test",
-      Test.postgresDriver % "test",
+      Test.pekkoClusterShardingTyped,
+      Test.postgresDriver,
       Test.pekkoSerializationJackson,
       Test.pekkoDiscovery,
-      Test.pekkoTypedTestkit % "test",
-      Test.postgresContainer % "test",
+      Test.pekkoTypedTestkit,
+      Test.postgresContainer,
       Test.r2dbcPostgres,
-      Test.logback % "test",
-      Test.scalatest % "test")
+      Test.logback,
+      Test.scalatest)
 
   val r2dbc =
     deps ++= Seq(
-      "org.apache.pekko" %% "pekko-persistence-r2dbc" % 
Versions.pekkoPersistenceR2dbc,
+      Compile.pekkoPersistenceR2dbc,
       Compile.pekkoPersistenceQuery,
       Compile.r2dbcSpi,
       Compile.r2dbcPool,
       Compile.r2dbcPostgres % "provided",
-      Compile.r2dbcMysql % "provided")
+      Compile.r2dbcMysql % "provided",
+      Test.pekkoClusterShardingTyped)
 
   val r2dbcIntTest =
     deps ++= Seq(
-      "org.apache.pekko" %% "pekko-persistence-r2dbc" % 
Versions.pekkoPersistenceR2dbc,
+      Compile.pekkoPersistenceR2dbc,
       Compile.pekkoPersistenceQuery,
       Compile.r2dbcSpi,
       Compile.r2dbcPool,
-      Compile.r2dbcPostgres % "test",
-      Compile.r2dbcMysql % "test",
+      Test.r2dbcPostgres,
+      Test.r2dbcMysql,
       Test.pekkoSerializationJackson,
       Test.pekkoDiscovery,
       Test.pekkoDistributedData,
-      Test.pekkoTypedTestkit % "test",
-      Test.pekkoStreamTestkit % "test",
-      Test.logback % "test",
-      Test.scalatest % "test")
+      Test.pekkoTypedTestkit,
+      Test.pekkoStreamTestkit,
+      Test.logback,
+      Test.scalatest)
 
   val kafkaTest =
     deps ++= Seq(
-      Test.scalatest % "test",
-      Test.pekkoTypedTestkit % "test",
-      Test.pekkoStreamTestkit % "test",
-      Test.connectorsKafkaTestkit % "test",
-      Test.logback % "test",
-      Test.scalatestJUnit % "test")
+      Test.scalatest,
+      Test.pekkoTypedTestkit,
+      Test.pekkoStreamTestkit,
+      Test.connectorsKafkaTestkit,
+      Test.logback,
+      Test.scalatestJUnit)
 
   val examples =
     deps ++= Seq(
@@ -289,8 +283,8 @@ object Dependencies {
       Examples.pekkoPersistenceCassandra,
       Examples.pekkoPersistenceJdbc,
       Examples.hibernate,
-      Test.h2Driver % "test",
-      Test.pekkoTypedTestkit % "test",
-      Test.logback % "test",
-      Test.cassandraContainer % "test")
+      Test.h2Driver,
+      Test.pekkoTypedTestkit,
+      Test.logback,
+      Test.cassandraContainer)
 }
diff --git a/ddl-scripts/create_tables_postgres.sql 
b/r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
similarity index 100%
copy from ddl-scripts/create_tables_postgres.sql
copy to r2dbc-int-test/ddl-scripts/create_tables_postgres.sql
diff --git a/ddl-scripts/create_tables_postgres.sql 
b/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
similarity index 82%
rename from ddl-scripts/create_tables_postgres.sql
rename to r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
index ebb91c6..af5528c 100644
--- a/ddl-scripts/create_tables_postgres.sql
+++ b/r2dbc-int-test/ddl-scripts/create_tables_yugabyte.sql
@@ -18,11 +18,12 @@ CREATE TABLE IF NOT EXISTS event_journal(
   meta_ser_manifest VARCHAR(255),
   meta_payload BYTEA,
 
-  PRIMARY KEY(persistence_id, seq_nr)
+  PRIMARY KEY(persistence_id HASH, seq_nr ASC)
 );
 
 -- `event_journal_slice_idx` is only needed if the slice based queries are used
-CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice, 
entity_type, db_timestamp, seq_nr);
+CREATE INDEX IF NOT EXISTS event_journal_slice_idx ON event_journal(slice ASC, 
entity_type ASC, db_timestamp ASC, seq_nr ASC, persistence_id, deleted)
+  SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
 
 CREATE TABLE IF NOT EXISTS snapshot(
   slice INT NOT NULL,
@@ -37,7 +38,7 @@ CREATE TABLE IF NOT EXISTS snapshot(
   meta_ser_manifest VARCHAR(255),
   meta_payload BYTEA,
 
-  PRIMARY KEY(persistence_id)
+  PRIMARY KEY(persistence_id HASH)
 );
 
 CREATE TABLE IF NOT EXISTS durable_state (
@@ -52,11 +53,12 @@ CREATE TABLE IF NOT EXISTS durable_state (
   state_payload BYTEA NOT NULL,
   tags TEXT ARRAY,
 
-  PRIMARY KEY(persistence_id, revision)
+  PRIMARY KEY(persistence_id HASH, revision ASC)
 );
 
 -- `durable_state_slice_idx` is only needed if the slice based queries are used
-CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice, 
entity_type, db_timestamp, revision);
+CREATE INDEX IF NOT EXISTS durable_state_slice_idx ON durable_state(slice ASC, 
entity_type ASC, db_timestamp ASC, revision ASC, persistence_id)
+  SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
 
 -- Primitive offset types are stored in this table.
 -- If only timestamp based offsets are used this table is optional.
@@ -72,6 +74,7 @@ CREATE TABLE IF NOT EXISTS projection_offset_store (
 );
 
 -- Timestamp based offsets are stored in this table.
+
 CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
   projection_name VARCHAR(255) NOT NULL,
   projection_key VARCHAR(255) NOT NULL,
@@ -83,8 +86,8 @@ CREATE TABLE IF NOT EXISTS projection_timestamp_offset_store (
   -- timestamp_consumed is when the offset was stored
   -- the consumer lag is timestamp_consumed - timestamp_offset
   timestamp_consumed timestamp with time zone NOT NULL,
-  PRIMARY KEY(slice, projection_name, timestamp_offset, persistence_id, seq_nr)
-);
+  PRIMARY KEY(slice ASC, projection_name ASC, timestamp_offset ASC, 
persistence_id ASC, seq_nr ASC)
+) SPLIT AT VALUES ((127), (255), (383), (511), (639), (767), (895));
 
 CREATE TABLE IF NOT EXISTS projection_management (
   projection_name VARCHAR(255) NOT NULL,
diff --git 
a/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java 
b/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java
new file mode 100644
index 0000000..84a61c9
--- /dev/null
+++ b/r2dbc/src/test/java/jdocs/home/projection/R2dbcProjectionDocExample.java
@@ -0,0 +1,283 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package jdocs.home.projection;
+
+import org.apache.pekko.Done;
+import org.apache.pekko.actor.typed.ActorSystem;
+import org.apache.pekko.actor.typed.javadsl.Behaviors;
+import org.apache.pekko.cluster.sharding.typed.javadsl.EntityTypeKey;
+import org.apache.pekko.japi.Pair;
+import org.apache.pekko.persistence.query.Offset;
+import org.apache.pekko.persistence.r2dbc.query.javadsl.R2dbcReadJournal;
+import org.apache.pekko.projection.eventsourced.javadsl.EventSourcedProvider;
+import org.apache.pekko.projection.javadsl.SourceProvider;
+
+import docs.home.CborSerializable;
+
+import java.time.Duration;
+import java.time.Instant;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Optional;
+
+// #handler
+// #grouped-handler
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcHandler;
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcSession;
+import io.r2dbc.spi.Statement;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionStage;
+
+// #grouped-handler
+// #handler
+
+// #initProjections
+import org.apache.pekko.cluster.sharding.typed.javadsl.ShardedDaemonProcess;
+import org.apache.pekko.projection.ProjectionBehavior;
+import org.apache.pekko.persistence.query.typed.EventEnvelope;
+import org.apache.pekko.projection.Projection;
+// #initProjections
+
+// #exactlyOnce
+// #atLeastOnce
+// #grouped
+// #initProjections
+import org.apache.pekko.projection.ProjectionId;
+import org.apache.pekko.projection.r2dbc.R2dbcProjectionSettings;
+import org.apache.pekko.projection.r2dbc.javadsl.R2dbcProjection;
+
+// #initProjections
+// #grouped
+// #atLeastOnce
+// #exactlyOnce
+
+@SuppressWarnings({"unused", "InnerClassMayBeStatic"})
+class R2dbcProjectionDocExample {
+
+  static class ShoppingCart {
+    public static EntityTypeKey<Command> ENTITY_TYPE_KEY =
+        EntityTypeKey.create(Command.class, "ShoppingCart");
+
+    interface Command extends CborSerializable {}
+
+    interface Event {
+      String getCartId();
+    }
+
+    public static class CheckedOut implements Event {
+
+      public final String cartId;
+      public final Instant eventTime;
+
+      public CheckedOut(String cartId, Instant eventTime) {
+        this.cartId = cartId;
+        this.eventTime = eventTime;
+      }
+
+      public String getCartId() {
+        return cartId;
+      }
+
+      @Override
+      public String toString() {
+        return "CheckedOut(" + cartId + "," + eventTime + ")";
+      }
+    }
+  }
+
+  // #handler
+  public class ShoppingCartHandler extends 
R2dbcHandler<EventEnvelope<ShoppingCart.Event>> {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public CompletionStage<Done> process(
+        R2dbcSession session, EventEnvelope<ShoppingCart.Event> envelope) {
+      ShoppingCart.Event event = envelope.event();
+      if (event instanceof ShoppingCart.CheckedOut) {
+        ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
+        logger.info(
+            "Shopping cart {} was checked out at {}", checkedOut.cartId, 
checkedOut.eventTime);
+
+        Statement stmt =
+            session
+                .createStatement("INSERT into order (id, time) VALUES ($1, 
$2)")
+                .bind(0, checkedOut.cartId)
+                .bind(1, checkedOut.eventTime);
+        return session.updateOne(stmt).thenApply(rowsUpdated -> 
Done.getInstance());
+
+      } else {
+        logger.debug("Shopping cart {} changed by {}", event.getCartId(), 
event);
+        return CompletableFuture.completedFuture(Done.getInstance());
+      }
+    }
+  }
+
+  // #handler
+
+  // #grouped-handler
+  public class GroupedShoppingCartHandler
+      extends R2dbcHandler<List<EventEnvelope<ShoppingCart.Event>>> {
+    private final Logger logger = LoggerFactory.getLogger(getClass());
+
+    @Override
+    public CompletionStage<Done> process(
+        R2dbcSession session, List<EventEnvelope<ShoppingCart.Event>> 
envelopes) {
+      List<Statement> stmts = new ArrayList<>();
+      for (EventEnvelope<ShoppingCart.Event> envelope : envelopes) {
+        ShoppingCart.Event event = envelope.event();
+        if (event instanceof ShoppingCart.CheckedOut) {
+          ShoppingCart.CheckedOut checkedOut = (ShoppingCart.CheckedOut) event;
+          logger.info(
+              "Shopping cart {} was checked out at {}", checkedOut.cartId, 
checkedOut.eventTime);
+
+          Statement stmt =
+              session
+                  .createStatement("INSERT into order (id, time) VALUES ($1, 
$2)")
+                  .bind(0, checkedOut.cartId)
+                  .bind(1, checkedOut.eventTime);
+          stmts.add(stmt);
+        } else {
+          logger.debug("Shopping cart {} changed by {}", event.getCartId(), 
event);
+        }
+      }
+
+      return session.update(stmts).thenApply(rowsUpdated -> 
Done.getInstance());
+    }
+  }
+
+  // #grouped-handler
+
+  ActorSystem<Void> system = ActorSystem.create(Behaviors.empty(), "Example");
+
+  // #initProjections
+  void initProjections() {
+    // Split the slices into 4 ranges
+    int numberOfSliceRanges = 4;
+    List<Pair<Integer, Integer>> sliceRanges =
+        EventSourcedProvider.sliceRanges(
+            system, R2dbcReadJournal.Identifier(), numberOfSliceRanges);
+
+    ShardedDaemonProcess.get(system)
+        .init(
+            ProjectionBehavior.Command.class,
+            "ShoppingCartProjection",
+            sliceRanges.size(),
+            i -> 
ProjectionBehavior.create(createProjection(sliceRanges.get(i))),
+            ProjectionBehavior.stopMessage());
+  }
+
+  Projection<EventEnvelope<ShoppingCart.Event>> createProjection(
+      Pair<Integer, Integer> sliceRange) {
+    int minSlice = sliceRange.first();
+    int maxSlice = sliceRange.second();
+
+    String entityType = ShoppingCart.ENTITY_TYPE_KEY.name();
+
+    SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
+        EventSourcedProvider.eventsBySlices(
+            system, R2dbcReadJournal.Identifier(), entityType, minSlice, 
maxSlice);
+
+    ProjectionId projectionId =
+        ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+    Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+    return R2dbcProjection.exactlyOnce(
+        projectionId, settings, sourceProvider, ShoppingCartHandler::new, 
system);
+  }
+
+  // #initProjections
+
+  // #sourceProvider
+  // Split the slices into 4 ranges
+  int numberOfSliceRanges = 4;
+  List<Pair<Integer, Integer>> sliceRanges =
+      EventSourcedProvider.sliceRanges(system, R2dbcReadJournal.Identifier(), 
numberOfSliceRanges);
+
+  // Example of using the first slice range
+  int minSlice = sliceRanges.get(0).first();
+  int maxSlice = sliceRanges.get(0).second();
+  String entityType = ShoppingCart.ENTITY_TYPE_KEY.name();
+
+  SourceProvider<Offset, EventEnvelope<ShoppingCart.Event>> sourceProvider =
+      EventSourcedProvider.eventsBySlices(
+          system, R2dbcReadJournal.Identifier(), entityType, minSlice, 
maxSlice);
+
+  // #sourceProvider
+
+  {
+    // #exactlyOnce
+    ProjectionId projectionId =
+        ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+    Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+    Projection<EventEnvelope<ShoppingCart.Event>> projection =
+        R2dbcProjection.exactlyOnce(
+            projectionId, settings, sourceProvider, ShoppingCartHandler::new, 
system);
+    // #exactlyOnce
+  }
+
+  {
+    // #atLeastOnce
+    ProjectionId projectionId =
+        ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+    Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+    int saveOffsetAfterEnvelopes = 100;
+    Duration saveOffsetAfterDuration = Duration.ofMillis(500);
+
+    Projection<EventEnvelope<ShoppingCart.Event>> projection =
+        R2dbcProjection.atLeastOnce(
+                projectionId, settings, sourceProvider, 
ShoppingCartHandler::new, system)
+            .withSaveOffset(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
+    // #atLeastOnce
+  }
+
+  {
+    // #grouped
+    ProjectionId projectionId =
+        ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+    Optional<R2dbcProjectionSettings> settings = Optional.empty();
+
+    int saveOffsetAfterEnvelopes = 100;
+    Duration saveOffsetAfterDuration = Duration.ofMillis(500);
+
+    Projection<EventEnvelope<ShoppingCart.Event>> projection =
+        R2dbcProjection.groupedWithin(
+                projectionId, settings, sourceProvider, 
GroupedShoppingCartHandler::new, system)
+            .withGroup(saveOffsetAfterEnvelopes, saveOffsetAfterDuration);
+    // #grouped
+  }
+
+  {
+    // #projectionSettings
+    ProjectionId projectionId =
+        ProjectionId.of("ShoppingCarts", "carts-" + minSlice + "-" + maxSlice);
+
+    Optional<R2dbcProjectionSettings> settings =
+        Optional.of(
+            R2dbcProjectionSettings.create(
+                
system.settings().config().getConfig("second-projection-r2dbc")));
+
+    Projection<EventEnvelope<ShoppingCart.Event>> projection =
+        R2dbcProjection.atLeastOnce(
+            projectionId, settings, sourceProvider, ShoppingCartHandler::new, 
system);
+    // #projectionSettings
+  }
+}
diff --git a/r2dbc/src/test/scala/docs/home/CborSerializable.scala 
b/r2dbc/src/test/scala/docs/home/CborSerializable.scala
new file mode 100644
index 0000000..3ccbac4
--- /dev/null
+++ b/r2dbc/src/test/scala/docs/home/CborSerializable.scala
@@ -0,0 +1,16 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home
+
+trait CborSerializable
diff --git 
a/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala 
b/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
new file mode 100644
index 0000000..2e0a171
--- /dev/null
+++ b/r2dbc/src/test/scala/docs/home/projection/R2dbcProjectionDocExample.scala
@@ -0,0 +1,274 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2022-2023 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package docs.home.projection
+
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.scaladsl.Behaviors
+import pekko.cluster.sharding.typed.scaladsl.EntityTypeKey
+import pekko.cluster.sharding.typed.scaladsl.ShardedDaemonProcess
+import pekko.persistence.query.Offset
+import pekko.projection.r2dbc.R2dbcProjectionSettings
+import docs.home.CborSerializable
+import org.slf4j.LoggerFactory
+
+import java.time.Instant
+import scala.annotation.nowarn
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+import scala.concurrent.duration._
+
+//#handler
+//#grouped-handler
+import pekko.persistence.query.typed.EventEnvelope
+import pekko.projection.r2dbc.scaladsl.R2dbcHandler
+import pekko.projection.r2dbc.scaladsl.R2dbcSession
+
+//#grouped-handler
+//#handler
+object R2dbcProjectionDocExample {
+
+  object ShoppingCart {
+    val EntityKey: EntityTypeKey[Command] = 
EntityTypeKey[Command]("ShoppingCart")
+
+    sealed trait Command extends CborSerializable
+
+    sealed trait Event extends CborSerializable {
+      def cartId: String
+    }
+
+    final case class ItemAdded(cartId: String, itemId: String, quantity: Int) 
extends Event
+    final case class ItemRemoved(cartId: String, itemId: String) extends Event
+    final case class ItemQuantityAdjusted(cartId: String, itemId: String, 
newQuantity: Int) extends Event
+    final case class CheckedOut(cartId: String, eventTime: Instant) extends 
Event
+  }
+
+  // #handler
+  class ShoppingCartHandler()(implicit ec: ExecutionContext) extends 
R2dbcHandler[EventEnvelope[ShoppingCart.Event]] {
+    private val logger = LoggerFactory.getLogger(getClass)
+
+    override def process(session: R2dbcSession, envelope: 
EventEnvelope[ShoppingCart.Event]): Future[Done] = {
+      envelope.event match {
+        case ShoppingCart.CheckedOut(cartId, time) =>
+          logger.info(s"Shopping cart $cartId was checked out at $time")
+          val stmt = session
+            .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
+            .bind(0, cartId)
+            .bind(1, time)
+          session
+            .updateOne(stmt)
+            .map(_ => Done)
+
+        case otherEvent =>
+          logger.debug(s"Shopping cart ${otherEvent.cartId} changed by 
$otherEvent")
+          Future.successful(Done)
+      }
+    }
+  }
+  // #handler
+
+  // #grouped-handler
+  import scala.collection.immutable
+
+  class GroupedShoppingCartHandler()(implicit ec: ExecutionContext)
+      extends R2dbcHandler[immutable.Seq[EventEnvelope[ShoppingCart.Event]]] {
+    private val logger = LoggerFactory.getLogger(getClass)
+
+    override def process(
+        session: R2dbcSession,
+        envelopes: immutable.Seq[EventEnvelope[ShoppingCart.Event]]): 
Future[Done] = {
+
+      // save all events in DB
+      val stmts = envelopes
+        .map(_.event)
+        .collect {
+          case ShoppingCart.CheckedOut(cartId, time) =>
+            logger.info(s"Shopping cart $cartId was checked out at $time")
+
+            session
+              .createStatement("INSERT into order (id, time) VALUES ($1, $2)")
+              .bind(0, cartId)
+              .bind(1, time)
+
+        }
+        .toVector
+
+      session.update(stmts).map(_ => Done)
+    }
+  }
+  // #grouped-handler
+
+  implicit val system: ActorSystem[Nothing] = ActorSystem(Behaviors.empty, 
"Example")
+  implicit val ec: ExecutionContext = system.executionContext
+
+  object IllustrateInit {
+    // #initProjections
+    import pekko.persistence.query.typed.EventEnvelope
+    import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+    import pekko.projection.Projection
+    import pekko.projection.ProjectionBehavior
+    import pekko.projection.ProjectionId
+    import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+    import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+    import pekko.projection.scaladsl.SourceProvider
+
+    def initProjections(): Unit = {
+      def sourceProvider(sliceRange: Range): SourceProvider[Offset, 
EventEnvelope[ShoppingCart.Event]] =
+        EventSourcedProvider
+          .eventsBySlices[ShoppingCart.Event](
+            system,
+            readJournalPluginId = R2dbcReadJournal.Identifier,
+            entityType,
+            sliceRange.min,
+            sliceRange.max)
+
+      def projection(sliceRange: Range): 
Projection[EventEnvelope[ShoppingCart.Event]] = {
+        val minSlice = sliceRange.min
+        val maxSlice = sliceRange.max
+        val projectionId = ProjectionId("ShoppingCarts", 
s"carts-$minSlice-$maxSlice")
+
+        R2dbcProjection
+          .exactlyOnce(
+            projectionId,
+            settings = None,
+            sourceProvider(sliceRange),
+            handler = () => new ShoppingCartHandler)
+      }
+
+      // Split the slices into 4 ranges
+      val numberOfSliceRanges: Int = 4
+      val sliceRanges = EventSourcedProvider.sliceRanges(system, 
R2dbcReadJournal.Identifier, numberOfSliceRanges)
+
+      ShardedDaemonProcess(system).init(
+        name = "ShoppingCartProjection",
+        numberOfInstances = sliceRanges.size,
+        behaviorFactory = i => ProjectionBehavior(projection(sliceRanges(i))),
+        stopMessage = ProjectionBehavior.Stop)
+    }
+    // #initProjections
+  }
+
+  // #sourceProvider
+  import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+  import pekko.projection.eventsourced.scaladsl.EventSourcedProvider
+  import pekko.projection.scaladsl.SourceProvider
+
+  // Slit the slices into 4 ranges
+  val numberOfSliceRanges: Int = 4
+  val sliceRanges = EventSourcedProvider.sliceRanges(system, 
R2dbcReadJournal.Identifier, numberOfSliceRanges)
+
+  // Example of using the first slice range
+  val minSlice: Int = sliceRanges.head.min
+  val maxSlice: Int = sliceRanges.head.max
+  val entityType: String = ShoppingCart.EntityKey.name
+
+  val sourceProvider: SourceProvider[Offset, 
EventEnvelope[ShoppingCart.Event]] =
+    EventSourcedProvider
+      .eventsBySlices[ShoppingCart.Event](
+        system,
+        readJournalPluginId = R2dbcReadJournal.Identifier,
+        entityType,
+        minSlice,
+        maxSlice)
+  // #sourceProvider
+
+  object IllustrateExactlyOnce {
+    // #exactlyOnce
+    import pekko.projection.ProjectionId
+    import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+    val projectionId = ProjectionId("ShoppingCarts", 
s"carts-$minSlice-$maxSlice")
+
+    val projection =
+      R2dbcProjection
+        .exactlyOnce(projectionId, settings = None, sourceProvider, handler = 
() => new ShoppingCartHandler)
+    // #exactlyOnce
+  }
+
+  object IllustrateAtLeastOnce {
+    // #atLeastOnce
+    import pekko.projection.ProjectionId
+    import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+    val projectionId = ProjectionId("ShoppingCarts", 
s"carts-$minSlice-$maxSlice")
+
+    val projection =
+      R2dbcProjection
+        .atLeastOnce(projectionId, settings = None, sourceProvider, handler = 
() => new ShoppingCartHandler)
+        .withSaveOffset(afterEnvelopes = 100, afterDuration = 500.millis)
+    // #atLeastOnce
+  }
+
+  object IllustrateGrouped {
+    // #grouped
+    import pekko.projection.ProjectionId
+    import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+    val projectionId = ProjectionId("ShoppingCarts", 
s"carts-$minSlice-$maxSlice")
+
+    val projection =
+      R2dbcProjection
+        .groupedWithin(projectionId, settings = None, sourceProvider, handler 
= () => new GroupedShoppingCartHandler)
+        .withGroup(groupAfterEnvelopes = 20, groupAfterDuration = 500.millis)
+    // #grouped
+  }
+
+  // Ignore Scala 2.13 compiler warning
+  @nowarn("msg=possible missing interpolator")
+  object IllustrateSettings {
+    val config =
+      """
+    // #second-projection-config
+    second-projection-r2dbc = ${pekko.projection.r2dbc}
+    second-projection-r2dbc {
+      offset-store {
+        # specific projection offset store properties here
+      }
+      use-connection-factory = "second-r2dbc.connection-factory"
+    }
+    // #second-projection-config
+    
+    // #second-projection-config-with-connection-factory
+    second-projection-r2dbc = ${pekko.projection.r2dbc}
+    second-projection-r2dbc {
+      connection-factory = ${pekko.persistence.r2dbc.connection-factory}
+      connection-factory {
+        # specific connection properties for offset store and projection 
handler here 
+      }
+      
+      offset-store {
+        # specific projection offset store properties here
+      }
+      use-connection-factory = "second-projection-r2dbc.connection-factory"
+    }
+    // #second-projection-config-with-connection-factory
+    """
+
+    // #projectionSettings
+
+    import pekko.projection.ProjectionId
+    import pekko.projection.r2dbc.scaladsl.R2dbcProjection
+
+    val projectionId = ProjectionId("ShoppingCarts", 
s"carts-$minSlice-$maxSlice")
+
+    val settings = 
Some(R2dbcProjectionSettings(system.settings.config.getConfig("second-projection-r2dbc")))
+
+    val projection =
+      R2dbcProjection
+        .atLeastOnce(projectionId, settings = None, sourceProvider, handler = 
() => new ShoppingCartHandler)
+    // #projectionSettings
+  }
+
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to