This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6e7e034  Configuring persistence plugins at runtime (#194)
6e7e034 is described below

commit 6e7e0342ce956a845f16a4b99f04edb311dea625
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Fri Aug 15 15:11:33 2025 +0300

    Configuring persistence plugins at runtime (#194)
    
    * Extract events by persistence ID functionality from read journal, make 
journal and query plugins depend on it
    
    * Rework plugin implementations to enable runtime plugin configuration
    
    * Add manual lifecycle handling for connection pools created by connection 
factory provider
    
    * Simplify plugin settings for tables with schemas
    
    * Remove redundant connection factory provider method
    
    * Fix core tests
    
    * Add runtime plugin config test
    
    * Add query plugin assertions to runtime plugin config test
    
    * Improve structure of runtime plugin config test
    
    * Add state plugin test to runtime plugin config spec
    
    * Clean up state test in runtime plugin config spec
    
    * Make migration tool use journal connection factory settings
    
    * WIP implement improved connection factory provider
    
    * Implement improved connection factory provider
    
    * Fix core tests
    
    * Fix projection tests
    
    * Fix migration tool
    
    * Use separate connection factories for test harnesses
    
    * Import and formatting fixes
    
    * Update license
    
    * Break up shared settings
    
    * Break up shared config
    
    * Remove unnecessary comments
    
    * Add test for v1 config
    
    * Add binary incompatibility filters
    
    * Add documentation
    
    * Fix documentation example compilation
    
    * Fix documentation example compilation for Scala 3
    
    * Fix license headers
    
    * Refactor free specs to word specs
    
    * Remove no longer relevant TODOs/FIXMEs
---
 .github/workflows/build-test.yml                   |  12 +
 .../r2dbcsettings.excludes                         |  20 ++
 core/src/main/resources/reference.conf             | 127 ++++++--
 .../r2dbc/ConnectionFactoryProvider.scala          |  42 +--
 .../pekko/persistence/r2dbc/R2dbcSettings.scala    | 211 +++++++++++---
 .../persistence/r2dbc/internal/BySliceQuery.scala  |  40 +--
 .../r2dbc/internal/EventsByPersistenceIdDao.scala  | 151 ++++++++++
 .../r2dbc/internal/HighestSequenceNrDao.scala      |  73 +++++
 .../persistence/r2dbc/journal/JournalDao.scala     |  70 ++---
 .../persistence/r2dbc/journal/R2dbcJournal.scala   |  15 +-
 .../r2dbc/journal/mysql/MySQLJournalDao.scala      |  15 +-
 .../r2dbc/query/scaladsl/QueryDao.scala            |  76 ++---
 .../r2dbc/query/scaladsl/R2dbcReadJournal.scala    |  82 +-----
 .../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala |   8 +-
 .../r2dbc/snapshot/R2dbcSnapshotStore.scala        |  22 +-
 .../persistence/r2dbc/snapshot/SnapshotDao.scala   |  21 +-
 .../r2dbc/snapshot/mysql/MySQLSnapshotDao.scala    |   4 +-
 .../r2dbc/state/scaladsl/DurableStateDao.scala     |  27 +-
 .../state/scaladsl/R2dbcDurableStateStore.scala    |  10 +-
 .../scaladsl/mysql/MySQLDurableStateDao.scala      |   6 +-
 .../resources/config-v1.conf}                      |   0
 .../persistence/r2dbc/R2dbcSettingsSpec.scala      |  64 ++--
 .../pekko/persistence/r2dbc/TestConfig.scala       |  19 +-
 .../pekko/persistence/r2dbc/TestDbLifecycle.scala  |  33 ++-
 .../r2dbc/journal/PersistTagsSpec.scala            |   7 +-
 .../r2dbc/journal/PersistTimestampSpec.scala       |   5 +-
 .../r2dbc/journal/RuntimePluginConfigSpec.scala    | 321 +++++++++++++++++++++
 .../r2dbc/query/EventsByPersistenceIdSpec.scala    |   2 -
 .../query/EventsBySliceBacktrackingSpec.scala      |   8 +-
 .../r2dbc/query/EventsBySlicePubSubSpec.scala      |  13 +-
 .../r2dbc/query/EventsBySliceSpec.scala            |   2 -
 .../r2dbc/state/DurableStateBySliceSpec.scala      |   2 -
 .../r2dbc/state/DurableStateStoreSpec.scala        |   2 +-
 .../paradox/{connection-config.md => config.md}    |  14 +-
 docs/src/main/paradox/durable-state-store.md       |   2 +-
 docs/src/main/paradox/index.md                     |   2 +-
 docs/src/main/paradox/journal.md                   |   2 +-
 docs/src/main/paradox/projection.md                |   2 +-
 docs/src/main/paradox/query.md                     |   2 +-
 docs/src/main/paradox/snapshots.md                 |   2 +-
 .../docs/home/RuntimePluginConfigExample.scala     |  79 +++++
 .../r2dbc/migration/MigrationTool.scala            |  23 +-
 .../r2dbc/EventSourcedEndToEndSpec.scala           |  12 +-
 .../projection/r2dbc/EventSourcedPubSubSpec.scala  |  28 +-
 .../apache/pekko/projection/r2dbc/TestConfig.scala |   8 +-
 .../pekko/projection/r2dbc/TestDbLifecycle.scala   |  20 +-
 46 files changed, 1256 insertions(+), 450 deletions(-)

diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 02b126a..a809c01 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -82,6 +82,10 @@ jobs:
           # TODO: could we poll the port instead of sleep?
           sleep 10
           docker exec -i docker-postgres-db-1 psql -U postgres -t < 
ddl-scripts/create_tables_postgres.sql
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE 
DATABASE database1;'
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1 
< ddl-scripts/create_tables_postgres.sql
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE 
DATABASE database2;'
+          docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2 
< ddl-scripts/create_tables_postgres.sql
 
       - name: test
         run: sbt ++${{ matrix.SCALA_VERSION }} test
@@ -128,6 +132,10 @@ jobs:
           # TODO: could we poll the port instead of sleep?
           sleep 10
           docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h 
yb-tserver-n1 -t < ddl-scripts/create_tables_yugabyte.sql
+          docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h 
yb-tserver-n1 -t -c 'CREATE DATABASE database1;'
+          docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h 
yb-tserver-n1 -t -d database1 < ddl-scripts/create_tables_yugabyte.sql
+          docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h 
yb-tserver-n1 -t -c 'CREATE DATABASE database2;'
+          docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h 
yb-tserver-n1 -t -d database2 < ddl-scripts/create_tables_yugabyte.sql
 
       - name: test
         run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte 
-Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
@@ -183,6 +191,10 @@ jobs:
         run: |-
           docker compose -f docker/docker-compose-mysql.yml up -d --wait
           docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root -e 'CREATE SCHEMA database1;'
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=database1 < ddl-scripts/create_tables_mysql.sql
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root -e 'CREATE SCHEMA database2;'
+          docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root 
--password=root --database=database2 < ddl-scripts/create_tables_mysql.sql
 
       - name: test
         run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{ 
matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
diff --git 
a/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes 
b/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes
new file mode 100644
index 0000000..e36b41f
--- /dev/null
+++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements.  See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership.  The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License.  You may obtain a copy of the License at
+#
+#   http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied.  See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/194
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.persistence.r2dbc.R2dbcSettings")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.persistence.r2dbc.R2dbcSettings$")
diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index 1419a80..ba1eec8 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -11,8 +11,6 @@ pekko.persistence.r2dbc {
     # Otherwise it would be a pinned dispatcher, see 
https://github.com/akka/akka/issues/31058
     plugin-dispatcher = "pekko.actor.default-dispatcher"
 
-    # event replay is using pekko.persistence.r2dbc.query.buffer-size
-
     # Enable this to reduce latency of eventsBySlices. The persisted events 
will be
     # published as Pekko messages and consumed directly by running 
eventsBySlices
     # queries. Tradeoff is more CPU and network resources that are used. The 
events
@@ -22,6 +20,20 @@ pekko.persistence.r2dbc {
 
     # replay filter not needed for this plugin
     replay-filter.mode = off
+
+    dialect = ${pekko.persistence.r2dbc.dialect}
+
+    schema = ${pekko.persistence.r2dbc.schema}
+
+    use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+    log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+    buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+    db-timestamp-monotonic-increasing = 
${pekko.persistence.r2dbc.db-timestamp-monotonic-increasing}
+
+    use-app-timestamp = ${pekko.persistence.r2dbc.use-app-timestamp}
   }
 }
 // #journal-settings
@@ -34,6 +46,14 @@ pekko.persistence.r2dbc {
 
     # Otherwise it would be a pinned dispatcher, see 
https://github.com/akka/akka/issues/31058
     plugin-dispatcher = "pekko.actor.default-dispatcher"
+
+    dialect = ${pekko.persistence.r2dbc.dialect}
+
+    schema = ${pekko.persistence.r2dbc.schema}
+
+    use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+    log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
   }
 }
 // #snapshot-settings
@@ -50,6 +70,31 @@ pekko.persistence.r2dbc {
     # previous revision. There might be a small performance gain if
     # this is disabled.
     assert-single-writer = on
+
+    dialect = ${pekko.persistence.r2dbc.dialect}
+
+    schema = ${pekko.persistence.r2dbc.schema}
+
+    use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+    log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+    buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+    refresh-interval = ${pekko.persistence.r2dbc.refresh-interval}
+
+    behind-current-time = ${pekko.persistence.r2dbc.behind-current-time}
+    backtracking {
+      enabled = ${pekko.persistence.r2dbc.backtracking.enabled}
+      window = ${pekko.persistence.r2dbc.backtracking.window}
+      behind-current-time = 
${pekko.persistence.r2dbc.backtracking.behind-current-time}
+    }
+
+    db-timestamp-monotonic-increasing = 
${pekko.persistence.r2dbc.db-timestamp-monotonic-increasing}
+
+    use-app-timestamp = ${pekko.persistence.r2dbc.use-app-timestamp}
+
+    persistence-ids.buffer-size = 
${pekko.persistence.r2dbc.persistence-ids.buffer-size}
   }
 }
 // #durable-state-settings
@@ -59,32 +104,9 @@ pekko.persistence.r2dbc {
   query {
     class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"
 
-    # When live queries return no results or <= 10% of buffer-size, the next 
query
-    # to db will be delayed for this duration.
-    # When the number of rows from previous query is >= 90% of buffer-size, 
the next
-    # query will be emitted immediately.
-    # Otherwise, between 10% - 90% of buffer-size, the next query will be 
delayed
-    # for half of this duration.
-    refresh-interval = 3s
-
-    # Live queries read events up to this duration from the current database 
time.
-    behind-current-time = 100 millis
-
-    backtracking {
-      enabled = on
-      # Backtracking queries will look back for this amount of time. It should
-      # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
-      window = 2 minutes
-      # Backtracking queries read events up to this duration from the current 
database time.
-      behind-current-time = 10 seconds
-    }
+    table = ${pekko.persistence.r2dbc.journal.table}
 
-    # In-memory buffer holding events when reading from database.
-    buffer-size = 1000
-
-    persistence-ids {
-      buffer-size = 1000
-    }
+    publish-events =  ${pekko.persistence.r2dbc.journal.publish-events}
 
     # When journal publish-events is enabled a best effort deduplication can 
be enabled by setting
     # this property to the size of the deduplication buffer in the 
`eventsBySlices` query.
@@ -94,6 +116,26 @@ pekko.persistence.r2dbc {
     # the backtracking queries.
     deduplicate-capacity = 0
 
+    dialect = ${pekko.persistence.r2dbc.dialect}
+
+    schema = ${pekko.persistence.r2dbc.schema}
+
+    use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+    log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+    buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+    refresh-interval = ${pekko.persistence.r2dbc.refresh-interval}
+
+    behind-current-time = ${pekko.persistence.r2dbc.behind-current-time}
+    backtracking {
+      enabled = ${pekko.persistence.r2dbc.backtracking.enabled}
+      window = ${pekko.persistence.r2dbc.backtracking.window}
+      behind-current-time = 
${pekko.persistence.r2dbc.backtracking.behind-current-time}
+    }
+
+    persistence-ids.buffer-size = 
${pekko.persistence.r2dbc.persistence-ids.buffer-size}
   }
 }
 // #query-settings
@@ -177,6 +219,11 @@ pekko.persistence.r2dbc {
     connection-factory-options-customizer = ""
   }
 
+  # Fully qualified config path which holds the connection factory 
configuration.
+  # Connection factories are initialized using the config at this path and are 
identified by the value of this path.
+  # All persistence plugins use the same value by default, which allows 
sharing of single connection factory between all of the plugins.
+  use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
+
   # If database timestamp is guaranteed to not move backwards for two 
subsequent
   # updates of the same persistenceId there might be a performance gain to
   # set this to `on`. Note that many databases use the system clock and that 
can
@@ -192,5 +239,31 @@ pekko.persistence.r2dbc {
   # Set to 0 to log all calls.
   log-db-calls-exceeding = 300 ms
 
+  # In-memory buffer holding events when reading from database.
+  buffer-size = 1000
+
+  # When live queries return no results or <= 10% of buffer-size, the next 
query
+  # to db will be delayed for this duration.
+  # When the number of rows from previous query is >= 90% of buffer-size, the 
next
+  # query will be emitted immediately.
+  # Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
+  # for half of this duration.
+  refresh-interval = 3s
+
+  # Live queries read events up to this duration from the current database 
time.
+  behind-current-time = 100 millis
+
+  backtracking {
+    enabled = on
+    # Backtracking queries will look back for this amount of time. It should
+    # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
+    window = 2 minutes
+    # Backtracking queries read events up to this duration from the current 
database time.
+    behind-current-time = 10 seconds
+  }
+
+  persistence-ids {
+    buffer-size = 1000
+  }
 }
 // #connection-settings
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
index 612e765..b6a5e5d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
@@ -18,16 +18,8 @@ import java.util.concurrent.ConcurrentHashMap
 
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
-import scala.util.{ Failure, Success }
-import com.typesafe.config.Config
-import io.r2dbc.pool.ConnectionPool
-import io.r2dbc.pool.ConnectionPoolConfiguration
-import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
-import io.r2dbc.postgresql.client.SSLMode
-import io.r2dbc.spi.ConnectionFactories
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.ConnectionFactoryOptions
-import io.r2dbc.spi.Option
+import scala.util.Failure
+import scala.util.Success
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.CoordinatedShutdown
@@ -36,8 +28,18 @@ import pekko.actor.typed.Extension
 import pekko.actor.typed.ExtensionId
 import 
pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
 import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
-import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
 import pekko.util.ccompat.JavaConverters._
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import io.r2dbc.pool.ConnectionPool
+import io.r2dbc.pool.ConnectionPoolConfiguration
+import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
+import io.r2dbc.postgresql.client.SSLMode
+import io.r2dbc.spi.ConnectionFactories
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.ConnectionFactoryOptions
+import io.r2dbc.spi.Option
 
 object ConnectionFactoryProvider extends 
ExtensionId[ConnectionFactoryProvider] {
   def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new 
ConnectionFactoryProvider(system)
@@ -75,7 +77,6 @@ object ConnectionFactoryProvider extends 
ExtensionId[ConnectionFactoryProvider]
 
 class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
 
-  import R2dbcExecutor.PublisherOps
   private val sessions = new ConcurrentHashMap[String, ConnectionPool]
 
   CoordinatedShutdown(system)
@@ -86,15 +87,20 @@ class ConnectionFactoryProvider(system: ActorSystem[_]) 
extends Extension {
         .map(_ => Done)
     }
 
-  def connectionFactoryFor(configLocation: String): ConnectionFactory = {
+  def connectionFactoryFor(configPath: String): ConnectionFactory = {
+    connectionFactoryFor(configPath, ConfigFactory.empty())
+  }
+
+  def connectionFactoryFor(configPath: String, config: Config): 
ConnectionFactory = {
     sessions
       .computeIfAbsent(
-        configLocation,
-        configLocation => {
-          val config = system.settings.config.getConfig(configLocation)
-          val settings = new ConnectionFactorySettings(config)
+        configPath,
+        _ => {
+          val fullConfig = config.withFallback(system.settings.config)
+          val settings =
+            new ConnectionFactorySettings(fullConfig.getConfig(configPath))
           val customizer = createConnectionFactoryOptionsCustomizer(settings)
-          createConnectionPoolFactory(settings, customizer, config)
+          createConnectionPoolFactory(settings, customizer, fullConfig)
         })
       .asInstanceOf[ConnectionFactory]
   }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 5cbfbff..9a67a80 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -16,107 +16,219 @@ package org.apache.pekko.persistence.r2dbc
 import java.util.Locale
 
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.annotation.InternalApi
 import pekko.annotation.InternalStableApi
+import pekko.util.Helpers.toRootLowerCase
 import pekko.util.JavaDurationConverters._
 import com.typesafe.config.Config
-import pekko.util.Helpers.toRootLowerCase
 
 /**
  * INTERNAL API
  */
 @InternalStableApi
-object R2dbcSettings {
-  def apply(config: Config): R2dbcSettings =
-    new R2dbcSettings(config)
+sealed trait Dialect
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object Dialect {
+  case object Postgres extends Dialect
+  case object Yugabyte extends Dialect
+
+  /** @since 1.1.0 */
+  case object MySQL extends Dialect
+
+  /** @since 1.1.0 */
+  def fromString(value: String): Dialect = {
+    toRootLowerCase(value) match {
+      case "yugabyte" => Dialect.Yugabyte
+      case "postgres" => Dialect.Postgres
+      case "mysql"    => Dialect.MySQL
+      case other =>
+        throw new IllegalArgumentException(
+          s"Unknown dialect [$other]. Supported dialects are [yugabyte, 
postgres, mysql].")
+    }
+  }
 }
 
 /**
  * INTERNAL API
  */
 @InternalStableApi
-final class R2dbcSettings(config: Config) {
-  val schema: Option[String] = 
Option(config.getString("schema")).filterNot(_.trim.isEmpty)
+final class JournalSettings(val config: Config) extends ConnectionSettings 
with UseConnnectionFactory with BufferSize
+    with JournalPublishEvents with DbTimestampMonotonicIncreasing with 
UseAppTimestamp {
 
-  val journalTable: String = config.getString("journal.table")
+  val journalTable: String = config.getString("table")
   val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
journalTable
+}
 
-  val journalPublishEvents: Boolean = 
config.getBoolean("journal.publish-events")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object JournalSettings {
+  def apply(config: Config): JournalSettings =
+    new JournalSettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class SnapshotSettings(val config: Config) extends ConnectionSettings 
with UseConnnectionFactory {
 
-  val snapshotsTable: String = config.getString("snapshot.table")
+  val snapshotsTable: String = config.getString("table")
   val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
snapshotsTable
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object SnapshotSettings {
+  def apply(config: Config): SnapshotSettings =
+    new SnapshotSettings(config)
+}
 
-  val durableStateTable: String = config.getString("state.table")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class StateSettings(val config: Config) extends ConnectionSettings with 
UseConnnectionFactory with BufferSize
+    with RefreshInterval with BySliceQuerySettings with 
DbTimestampMonotonicIncreasing with PersistenceIdsQuerySettings
+    with UseAppTimestamp {
+
+  val durableStateTable: String = config.getString("table")
   val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("") 
+ durableStateTable
 
-  val durableStateAssertSingleWriter: Boolean = 
config.getBoolean("state.assert-single-writer")
+  val durableStateAssertSingleWriter: Boolean = 
config.getBoolean("assert-single-writer")
+}
 
-  val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object StateSettings {
+  def apply(config: Config): StateSettings =
+    new StateSettings(config)
+}
 
-  val querySettings = new QuerySettings(config.getConfig("query"))
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class QuerySettings(val config: Config) extends ConnectionSettings with 
UseConnnectionFactory with BufferSize
+    with RefreshInterval with BySliceQuerySettings with JournalPublishEvents 
with PersistenceIdsQuerySettings {
 
-  val connectionFactorySettings = new 
ConnectionFactorySettings(config.getConfig("connection-factory"))
+  val journalTable: String = config.getString("table")
+  val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") + 
journalTable
 
-  val dbTimestampMonotonicIncreasing: Boolean = 
config.getBoolean("db-timestamp-monotonic-increasing")
+  val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
+}
 
-  /**
-   * INTERNAL API FIXME remove when 
https://github.com/yugabyte/yugabyte-db/issues/10995 has been resolved
-   */
-  @InternalApi private[pekko] val useAppTimestamp: Boolean = 
config.getBoolean("use-app-timestamp")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object QuerySettings {
+  def apply(config: Config): QuerySettings =
+    new QuerySettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait ConnectionSettings {
+  def config: Config
+
+  val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
+
+  val schema: Option[String] = 
Option(config.getString("schema")).filterNot(_.trim.isEmpty)
 
   val logDbCallsExceeding: FiniteDuration =
     config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match {
       case "off" => -1.millis
       case _     => config.getDuration("log-db-calls-exceeding").asScala
     }
+}
 
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait JournalPublishEvents {
+  def config: Config
+
+  val journalPublishEvents: Boolean = config.getBoolean("publish-events")
 }
 
 /**
  * INTERNAL API
  */
 @InternalStableApi
-sealed trait Dialect
+trait DbTimestampMonotonicIncreasing {
+  def config: Config
+
+  val dbTimestampMonotonicIncreasing: Boolean = 
config.getBoolean("db-timestamp-monotonic-increasing")
+}
 
 /**
  * INTERNAL API
  */
 @InternalStableApi
-object Dialect {
-  case object Postgres extends Dialect
-  case object Yugabyte extends Dialect
+trait UseAppTimestamp {
+  def config: Config
 
-  /** @since 1.1.0 */
-  case object MySQL extends Dialect
+  /**
+   * INTERNAL API FIXME remove when 
https://github.com/yugabyte/yugabyte-db/issues/10995 has been resolved
+   */
+  @InternalApi private[pekko] val useAppTimestamp: Boolean = 
config.getBoolean("use-app-timestamp")
+}
 
-  /** @since 1.1.0 */
-  def fromString(value: String): Dialect = {
-    toRootLowerCase(value) match {
-      case "yugabyte" => Dialect.Yugabyte
-      case "postgres" => Dialect.Postgres
-      case "mysql"    => Dialect.MySQL
-      case other =>
-        throw new IllegalArgumentException(
-          s"Unknown dialect [$other]. Supported dialects are [yugabyte, 
postgres, mysql].")
-    }
-  }
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait BufferSize {
+  def config: Config
+
+  val bufferSize: Int = config.getInt("buffer-size")
 }
 
 /**
  * INTERNAL API
  */
 @InternalStableApi
-final class QuerySettings(config: Config) {
+trait RefreshInterval {
+  def config: Config
+
   val refreshInterval: FiniteDuration = 
config.getDuration("refresh-interval").asScala
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait BySliceQuerySettings {
+  def config: Config
+
   val behindCurrentTime: FiniteDuration = 
config.getDuration("behind-current-time").asScala
   val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled")
   val backtrackingWindow: FiniteDuration = 
config.getDuration("backtracking.window").asScala
   val backtrackingBehindCurrentTime: FiniteDuration = 
config.getDuration("backtracking.behind-current-time").asScala
-  val bufferSize: Int = config.getInt("buffer-size")
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait PersistenceIdsQuerySettings {
+  def config: Config
+
   val persistenceIdsBufferSize: Int = 
config.getInt("persistence-ids.buffer-size")
-  val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
 }
 
 /**
@@ -156,3 +268,22 @@ final class ConnectionFactorySettings(config: Config) {
   val connectionFactoryOptionsCustomizer: Option[String] =
     
Option(config.getString("connection-factory-options-customizer")).filter(_.trim.nonEmpty)
 }
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object ConnectionFactorySettings {
+  def apply(config: Config): ConnectionFactorySettings =
+    new ConnectionFactorySettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait UseConnnectionFactory {
+  def config: Config
+
+  val useConnectionFactory: String = config.getString("use-connection-factory")
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index cbd2f70..f3bacbc 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -13,22 +13,23 @@
 
 package org.apache.pekko.persistence.r2dbc.internal
 
-import scala.collection.immutable
 import java.time.Instant
 import java.time.{ Duration => JDuration }
 
 import scala.annotation.tailrec
+import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.annotation.InternalApi
 import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.BufferSize
+import pekko.persistence.r2dbc.BySliceQuerySettings
+import pekko.persistence.r2dbc.RefreshInterval
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
 import pekko.stream.scaladsl.Flow
 import pekko.stream.scaladsl.Source
@@ -98,7 +99,10 @@ import org.slf4j.Logger
    *   Key is the epoch seconds for the start of the bucket. Value is the 
number of entries in the bucket.
    */
   class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds, 
Buckets.Count]) {
-    import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds }
+    import Buckets.Bucket
+    import Buckets.BucketDurationSeconds
+    import Buckets.Count
+    import Buckets.EpochSeconds
 
     val createdAt: Instant = Instant.now()
 
@@ -189,15 +193,15 @@ import org.slf4j.Logger
     dao: BySliceQuery.Dao[Row],
     createEnvelope: (TimestampOffset, Row) => Envelope,
     extractOffset: Envelope => TimestampOffset,
-    settings: R2dbcSettings,
+    settings: BySliceQuerySettings with RefreshInterval with BufferSize, // 
use `with` to mix the settings classes
     log: Logger)(implicit val ec: ExecutionContext) {
   import BySliceQuery._
   import TimestampOffset.toTimestampOffset
 
-  private val backtrackingWindow = 
JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
+  private val backtrackingWindow = 
JDuration.ofMillis(settings.backtrackingWindow.toMillis)
   private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
   private val firstBacktrackingQueryWindow =
-    
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
+    
backtrackingWindow.plus(JDuration.ofMillis(settings.backtrackingBehindCurrentTime.toMillis))
   private val eventBucketCountInterval = JDuration.ofSeconds(60)
 
   def currentBySlices(
@@ -218,7 +222,7 @@ import org.slf4j.Logger
       if (state.queryCount == 0L || state.rowCount > 0) {
         val newState = state.copy(rowCount = 0, queryCount = state.queryCount 
+ 1)
 
-        val toTimestamp = 
newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
+        val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize) 
match {
           case Some(t) =>
             if (t.isBefore(endTimestamp)) t else endTimestamp
           case None =>
@@ -323,8 +327,8 @@ import org.slf4j.Logger
       } else {
         val delay = ContinuousQuery.adjustNextDelay(
           state.rowCount,
-          settings.querySettings.bufferSize,
-          settings.querySettings.refreshInterval)
+          settings.bufferSize,
+          settings.refreshInterval)
 
         if (log.isDebugEnabled)
           delay.foreach { d =>
@@ -342,13 +346,13 @@ import org.slf4j.Logger
     }
 
     def switchFromBacktracking(state: QueryState): Boolean = {
-      state.backtracking && state.rowCount < settings.querySettings.bufferSize 
- 1
+      state.backtracking && state.rowCount < settings.bufferSize - 1
     }
 
     def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope, 
NotUsed]]) = {
       val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
       val newState =
-        if (settings.querySettings.backtrackingEnabled && !state.backtracking 
&& state.latest != TimestampOffset.Zero &&
+        if (settings.backtrackingEnabled && !state.backtracking && 
state.latest != TimestampOffset.Zero &&
           (newIdleCount >= 5 || JDuration
             .between(state.latestBacktracking.timestamp, 
state.latest.timestamp)
             .compareTo(halfBacktrackingWindow) > 0)) {
@@ -376,11 +380,11 @@ import org.slf4j.Logger
         }
 
       val behindCurrentTime =
-        if (newState.backtracking) 
settings.querySettings.backtrackingBehindCurrentTime
-        else settings.querySettings.behindCurrentTime
+        if (newState.backtracking) settings.backtrackingBehindCurrentTime
+        else settings.behindCurrentTime
 
       val fromTimestamp = newState.nextQueryFromTimestamp
-      val toTimestamp = 
newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
+      val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize)
 
       if (log.isDebugEnabled())
         log.debug(
@@ -431,7 +435,7 @@ import org.slf4j.Logger
       // For Durable State we always refresh the bucket counts at the 
interval. For Event Sourced we know
       // that they don't change because events are append only.
       (dao.countBucketsMayChange || state.buckets
-        .findTimeForLimit(state.latest.timestamp, 
settings.querySettings.bufferSize)
+        .findTimeForLimit(state.latest.timestamp, settings.bufferSize)
         .isEmpty)) {
 
       val fromTimestamp =
@@ -475,9 +479,9 @@ import org.slf4j.Logger
         if (row.dbTimestamp == currentTimestamp) {
           // has this already been seen?
           if (currentSequenceNrs.get(row.persistenceId).exists(_ >= 
row.seqNr)) {
-            if (currentSequenceNrs.size >= settings.querySettings.bufferSize) {
+            if (currentSequenceNrs.size >= settings.bufferSize) {
               throw new IllegalStateException(
-                s"Too many events stored with the same timestamp 
[$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]")
+                s"Too many events stored with the same timestamp 
[$currentTimestamp], buffer size [${settings.bufferSize}]")
             }
             log.trace(
               "filtering [{}] [{}] as db timestamp is the same as last offset 
and is in seen [{}]",
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
new file mode 100644
index 0000000..4099091
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.time.Instant
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.BufferSize
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
+import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
+import pekko.stream.scaladsl.Source
+import org.slf4j.LoggerFactory
+import scala.concurrent.ExecutionContext
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object EventsByPersistenceIdDao {
+
+  private val log = LoggerFactory.getLogger(classOf[EventsByPersistenceIdDao])
+
+  private final case class ByPersistenceIdState(queryCount: Int, rowCount: 
Int, latestSeqNr: Long)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] trait EventsByPersistenceIdDao {
+  import EventsByPersistenceIdDao._
+
+  implicit protected def ec: ExecutionContext
+
+  implicit protected def dialect: Dialect
+  protected def statementTimestampSql: String
+
+  protected def journalTable: String
+
+  protected def r2dbcExecutor: R2dbcExecutor
+
+  protected def settings: BufferSize
+
+  private lazy val selectEventsSql = sql"""
+    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload
+    from $journalTable
+    WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
+    AND deleted = false
+    ORDER BY seq_nr
+    LIMIT ?"""
+
+  /**
+   * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
+   */
+  @InternalApi private[r2dbc] def internalEventsByPersistenceId(
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+    def updateState(state: ByPersistenceIdState, row: SerializedJournalRow): 
ByPersistenceIdState =
+      state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
+
+    def nextQuery(
+        state: ByPersistenceIdState,
+        highestSeqNr: Long): (ByPersistenceIdState, 
Option[Source[SerializedJournalRow, NotUsed]]) = {
+      if (state.queryCount == 0L || state.rowCount >= settings.bufferSize) {
+        val newState = state.copy(rowCount = 0, queryCount = state.queryCount 
+ 1)
+
+        if (state.queryCount != 0 && log.isDebugEnabled())
+          log.debug(
+            "currentEventsByPersistenceId query [{}] for persistenceId [{}], 
from [{}] to [{}]. Found [{}] rows in previous query.",
+            state.queryCount: java.lang.Integer,
+            persistenceId,
+            state.latestSeqNr + 1: java.lang.Long,
+            highestSeqNr: java.lang.Long,
+            state.rowCount: java.lang.Integer)
+
+        newState -> Some(eventsByPersistenceId(persistenceId, 
state.latestSeqNr + 1, highestSeqNr))
+      } else {
+        log.debug(
+          "currentEventsByPersistenceId query [{}] for persistenceId [{}] 
completed. Found [{}] rows in previous query.",
+          state.queryCount: java.lang.Integer,
+          persistenceId,
+          state.rowCount: java.lang.Integer)
+
+        state -> None
+      }
+    }
+
+    if (log.isDebugEnabled())
+      log.debug(
+        "currentEventsByPersistenceId query for persistenceId [{}], from [{}] 
to [{}].",
+        persistenceId,
+        fromSequenceNr: java.lang.Long,
+        toSequenceNr: java.lang.Long)
+
+    ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
+      initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr - 
1),
+      updateState = updateState,
+      delayNextQuery = _ => None,
+      nextQuery = state => nextQuery(state, toSequenceNr))
+  }
+
+  def eventsByPersistenceId(
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+
+    val result = r2dbcExecutor.select(s"select eventsByPersistenceId 
[$persistenceId]")(
+      connection =>
+        connection
+          .createStatement(selectEventsSql)
+          .bind(0, persistenceId)
+          .bind(1, fromSequenceNr)
+          .bind(2, toSequenceNr)
+          .bind(3, settings.bufferSize),
+      row =>
+        SerializedJournalRow(
+          slice = row.get[Integer]("slice", classOf[Integer]),
+          entityType = row.get("entity_type", classOf[String]),
+          persistenceId = row.get("persistence_id", classOf[String]),
+          seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+          dbTimestamp = row.get("db_timestamp", classOf[Instant]),
+          readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
+          payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+          serId = row.get[Integer]("event_ser_id", classOf[Integer]),
+          serManifest = row.get("event_ser_manifest", classOf[String]),
+          writerUuid = row.get("writer", classOf[String]),
+          tags = Set.empty, // tags not fetched in queries (yet)
+          metadata = readMetadata(row)))
+
+    if (log.isDebugEnabled)
+      result.foreach(rows => log.debug("Read [{}] events for persistenceId 
[{}]", rows.size, persistenceId))
+
+    Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => 
NotUsed)
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
new file mode 100644
index 0000000..b1aa52f
--- /dev/null
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.dispatch.ExecutionContexts
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object HighestSequenceNrDao {
+
+  private val log = LoggerFactory.getLogger(classOf[HighestSequenceNrDao])
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+trait HighestSequenceNrDao {
+  import HighestSequenceNrDao._
+
+  implicit protected def ec: ExecutionContext
+
+  implicit protected def dialect: Dialect
+
+  protected def journalTable: String
+
+  protected def r2dbcExecutor: R2dbcExecutor
+
+  private lazy val selectHighestSequenceNrSql = sql"""
+    SELECT MAX(seq_nr) from $journalTable
+    WHERE persistence_id = ? AND seq_nr >= ?"""
+
+  def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): 
Future[Long] = {
+    val result = r2dbcExecutor
+      .select(s"select highest seqNr [$persistenceId]")(
+        connection =>
+          connection
+            .createStatement(selectHighestSequenceNrSql)
+            .bind(0, persistenceId)
+            .bind(1, fromSequenceNr),
+        row => {
+          val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
+          if (seqNr eq null) 0L else seqNr.longValue
+        })
+      .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
+
+    if (log.isDebugEnabled)
+      result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId 
[{}]: [{}]", persistenceId, seqNr))
+
+    result
+  }
+}
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 3c881b6..c06516d 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -17,9 +17,6 @@ import java.time.Instant
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
-import io.r2dbc.spi.Statement
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
@@ -27,12 +24,18 @@ import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
+import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
+import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -74,16 +77,16 @@ private[r2dbc] object JournalDao {
   }
 
   def fromConfig(
-      journalSettings: R2dbcSettings,
-      sharedConfigPath: String
+      settings: JournalSettings,
+      config: Config
   )(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = {
     val connectionFactory =
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
-    journalSettings.dialect match {
+      
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
 config)
+    settings.dialect match {
       case Dialect.Postgres | Dialect.Yugabyte =>
-        new JournalDao(journalSettings, connectionFactory)
+        new JournalDao(settings, connectionFactory)
       case Dialect.MySQL =>
-        new MySQLJournalDao(journalSettings, connectionFactory)
+        new MySQLJournalDao(settings, connectionFactory)
     }
   }
 }
@@ -94,22 +97,21 @@ private[r2dbc] object JournalDao {
  * Class for doing db interaction outside of an actor to avoid mistakes in 
future callbacks
  */
 @InternalApi
-private[r2dbc] class JournalDao(journalSettings: R2dbcSettings, 
connectionFactory: ConnectionFactory)(
-    implicit
-    ec: ExecutionContext,
-    system: ActorSystem[_]) {
-
+private[r2dbc] class JournalDao(val settings: JournalSettings, 
connectionFactory: ConnectionFactory)(
+    implicit val ec: ExecutionContext, system: ActorSystem[_]) extends 
EventsByPersistenceIdDao
+    with HighestSequenceNrDao {
   import JournalDao.SerializedJournalRow
   import JournalDao.log
 
-  implicit protected val dialect: Dialect = journalSettings.dialect
+  implicit protected val dialect: Dialect = settings.dialect
   protected lazy val timestampSql: String = "transaction_timestamp()"
+  protected lazy val statementTimestampSql: String = "statement_timestamp()"
 
   private val persistenceExt = Persistence(system)
 
-  private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
journalSettings.logDbCallsExceeding)(ec, system)
+  protected val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
-  protected val journalTable: String = journalSettings.journalTableWithSchema
+  protected val journalTable: String = settings.journalTableWithSchema
 
   protected val (insertEventWithParameterTimestampSql: String, 
insertEventWithTransactionTimestampSql: String) = {
     val baseSql =
@@ -125,14 +127,14 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
       "WHERE persistence_id = ? AND seq_nr = ?)"
 
     val insertEventWithParameterTimestampSql = {
-      if (journalSettings.dbTimestampMonotonicIncreasing)
+      if (settings.dbTimestampMonotonicIncreasing)
         sql"$baseSql ?) RETURNING db_timestamp"
       else
         sql"$baseSql GREATEST(?, $timestampSubSelect)) RETURNING db_timestamp"
     }
 
     val insertEventWithTransactionTimestampSql = {
-      if (journalSettings.dbTimestampMonotonicIncreasing)
+      if (settings.dbTimestampMonotonicIncreasing)
         sql"$baseSql transaction_timestamp()) RETURNING db_timestamp"
       else
         sql"$baseSql GREATEST(transaction_timestamp(), $timestampSubSelect)) 
RETURNING db_timestamp"
@@ -141,10 +143,6 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
     (insertEventWithParameterTimestampSql, 
insertEventWithTransactionTimestampSql)
   }
 
-  private val selectHighestSequenceNrSql = sql"""
-    SELECT MAX(seq_nr) from $journalTable
-    WHERE persistence_id = ? AND seq_nr >= ?"""
-
   private val deleteEventsSql = sql"""
     DELETE FROM $journalTable
     WHERE persistence_id = ? AND seq_nr <= ?"""
@@ -205,12 +203,12 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
       }
 
       if (useTimestampFromDb) {
-        if (!journalSettings.dbTimestampMonotonicIncreasing)
+        if (!settings.dbTimestampMonotonicIncreasing)
           stmt
             .bind(13, write.persistenceId)
             .bind(14, previousSeqNr)
       } else {
-        if (journalSettings.dbTimestampMonotonicIncreasing)
+        if (settings.dbTimestampMonotonicIncreasing)
           stmt
             .bind(13, write.dbTimestamp)
         else
@@ -263,26 +261,6 @@ private[r2dbc] class JournalDao(journalSettings: 
R2dbcSettings, connectionFactor
     }
   }
 
-  def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long): 
Future[Long] = {
-    val result = r2dbcExecutor
-      .select(s"select highest seqNr [$persistenceId]")(
-        connection =>
-          connection
-            .createStatement(selectHighestSequenceNrSql)
-            .bind(0, persistenceId)
-            .bind(1, fromSequenceNr),
-        row => {
-          val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
-          if (seqNr eq null) 0L else seqNr.longValue
-        })
-      .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
-
-    if (log.isDebugEnabled)
-      result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId 
[{}]: [{}]", persistenceId, seqNr))
-
-    result
-  }
-
   def deleteMessagesTo(persistenceId: String, toSequenceNr: Long): 
Future[Unit] = {
     val entityType = PersistenceId.extractEntityType(persistenceId)
     val slice = persistenceExt.sliceForPersistenceId(persistenceId)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index d141ebd..200d43a 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -21,7 +21,7 @@ import scala.concurrent.Future
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
+import com.typesafe.config.Config
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.ActorRef
@@ -35,18 +35,15 @@ import pekko.persistence.Persistence
 import pekko.persistence.PersistentRepr
 import pekko.persistence.journal.AsyncWriteJournal
 import pekko.persistence.journal.Tagged
-import pekko.persistence.query.PersistenceQuery
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.internal.PubSub
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
-import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
 import pekko.serialization.Serialization
 import pekko.serialization.SerializationExtension
 import pekko.serialization.Serializers
 import pekko.stream.scaladsl.Sink
-import com.typesafe.config.Config
 
 /**
  * INTERNAL API
@@ -92,12 +89,10 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
 
   private val persistenceExt = Persistence(system)
 
-  private val sharedConfigPath = cfgPath.replaceAll("""\.journal$""", "")
   private val serialization: Serialization = 
SerializationExtension(context.system)
-  private val journalSettings = 
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
+  private val journalSettings = JournalSettings(config)
 
-  private val journalDao = JournalDao.fromConfig(journalSettings, 
sharedConfigPath)
-  private val query = 
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath + 
".query")
+  private val journalDao = JournalDao.fromConfig(journalSettings, config)
 
   private val pubSub: Option[PubSub] =
     if (journalSettings.journalPublishEvents) Some(PubSub(system))
@@ -226,7 +221,7 @@ private[r2dbc] final class R2dbcJournal(config: Config, 
cfgPath: String) extends
     val effectiveToSequenceNr =
       if (max == Long.MaxValue) toSequenceNr
       else math.min(toSequenceNr, fromSequenceNr + max - 1)
-    query
+    journalDao
       .internalEventsByPersistenceId(persistenceId, fromSequenceNr, 
effectiveToSequenceNr)
       .runWith(Sink.foreach { row =>
         val repr = deserializeRow(serialization, row)
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
index 9adba88..e336add 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -20,29 +20,31 @@
 package org.apache.pekko.persistence.r2dbc.journal.mysql
 
 import scala.concurrent.ExecutionContext
-import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.DbTimestampMonotonicIncreasing
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.UseAppTimestamp
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
+import io.r2dbc.spi.ConnectionFactory
 
 /**
  * INTERNAL API
  */
 @InternalApi
 private[r2dbc] object MySQLJournalDao {
-  def settingRequirements(journalSettings: R2dbcSettings): Unit = {
+  def settingRequirements(settings: UseAppTimestamp with 
DbTimestampMonotonicIncreasing): Unit = {
     // Application timestamps are used because MySQL does not have 
transaction_timestamp like Postgres. In future releases
     // they could be tried to be emulated, but the benefits are questionable - 
no matter where the timestamps are generated,
     // risk of clock skews remains.
-    require(journalSettings.useAppTimestamp,
+    require(settings.useAppTimestamp,
       "use-app-timestamp config must be on for MySQL support")
     // Supporting the non-monotonic increasing timestamps by incrementing the 
timestamp within the insert queries based on
     // latest row in the database seems to cause deadlocks when running tests 
like PersistTimestampSpec. Possibly this could
     // be fixed.
-    require(journalSettings.dbTimestampMonotonicIncreasing,
+    require(settings.dbTimestampMonotonicIncreasing,
       "db-timestamp-monotonic-increasing config must be on for MySQL support")
     // Also, missing RETURNING implementation makes grabbing the timestamp 
generated by the database less efficient - this
     // applies for both of the requirements above.
@@ -54,13 +56,14 @@ private[r2dbc] object MySQLJournalDao {
  */
 @InternalApi
 private[r2dbc] class MySQLJournalDao(
-    journalSettings: R2dbcSettings,
+    journalSettings: JournalSettings,
     connectionFactory: ConnectionFactory)(
     implicit ec: ExecutionContext, system: ActorSystem[_]
 ) extends JournalDao(journalSettings, connectionFactory) {
   MySQLJournalDao.settingRequirements(journalSettings)
 
   override lazy val timestampSql: String = "NOW(6)"
+  override lazy val statementTimestampSql: String = "NOW(6)"
 
   override val insertEventWithParameterTimestampSql: String =
     sql"INSERT INTO $journalTable " +
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index eebf8bc..20aa370 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -19,23 +19,26 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
+import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.JournalDao
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
 import pekko.stream.scaladsl.Source
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -43,16 +46,16 @@ object QueryDao {
   val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
 
   def fromConfig(
-      journalSettings: R2dbcSettings,
-      sharedConfigPath: String
+      settings: QuerySettings,
+      config: Config
   )(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = {
     val connectionFactory =
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
-    journalSettings.dialect match {
+      
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
 config)
+    settings.dialect match {
       case Dialect.Postgres | Dialect.Yugabyte =>
-        new QueryDao(journalSettings, connectionFactory)
+        new QueryDao(settings, connectionFactory)
       case Dialect.MySQL =>
-        new MySQLQueryDao(journalSettings, connectionFactory)
+        new MySQLQueryDao(settings, connectionFactory)
     }
   }
 }
@@ -61,11 +64,9 @@ object QueryDao {
  * INTERNAL API
  */
 @InternalApi
-private[r2dbc] class QueryDao(settings: R2dbcSettings, connectionFactory: 
ConnectionFactory)(
-    implicit
-    ec: ExecutionContext,
-    system: ActorSystem[_])
-    extends BySliceQuery.Dao[SerializedJournalRow] {
+private[r2dbc] class QueryDao(val settings: QuerySettings, connectionFactory: 
ConnectionFactory)(
+    implicit val ec: ExecutionContext, system: ActorSystem[_]) extends 
BySliceQuery.Dao[SerializedJournalRow]
+    with EventsByPersistenceIdDao with HighestSequenceNrDao {
   import JournalDao.readMetadata
   import QueryDao.log
 
@@ -139,21 +140,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
     FROM $journalTable
     WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
 
-  private val selectEventsSql = sql"""
-    SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp, 
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest, 
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest, 
meta_payload
-    from $journalTable
-    WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
-    AND deleted = false
-    ORDER BY seq_nr
-    LIMIT ?"""
-
   private val allPersistenceIdsSql =
     sql"SELECT DISTINCT(persistence_id) from $journalTable ORDER BY 
persistence_id LIMIT ?"
 
   private val allPersistenceIdsAfterSql =
     sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE 
persistence_id > ? ORDER BY persistence_id LIMIT ?"
 
-  private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
+  protected val r2dbcExecutor =
+    new R2dbcExecutor(connectionFactory, log, 
settings.logDbCallsExceeding)(ec, system)
 
   def currentDbTimestamp(): Future[Instant] = {
     r2dbcExecutor
@@ -189,9 +183,9 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
         toTimestamp match {
           case Some(until) =>
             stmt.bind(2, until)
-            stmt.bind(3, settings.querySettings.bufferSize)
+            stmt.bind(3, settings.bufferSize)
           case None =>
-            stmt.bind(2, settings.querySettings.bufferSize)
+            stmt.bind(2, settings.bufferSize)
         }
         stmt
       },
@@ -311,40 +305,6 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings, 
connectionFactory: Connec
           tags = Set.empty, // tags not fetched in queries (yet)
           metadata = readMetadata(row)))
 
-  def eventsByPersistenceId(
-      persistenceId: String,
-      fromSequenceNr: Long,
-      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
-
-    val result = r2dbcExecutor.select(s"select eventsByPersistenceId 
[$persistenceId]")(
-      connection =>
-        connection
-          .createStatement(selectEventsSql)
-          .bind(0, persistenceId)
-          .bind(1, fromSequenceNr)
-          .bind(2, toSequenceNr)
-          .bind(3, settings.querySettings.bufferSize),
-      row =>
-        SerializedJournalRow(
-          slice = row.get[Integer]("slice", classOf[Integer]),
-          entityType = row.get("entity_type", classOf[String]),
-          persistenceId = row.get("persistence_id", classOf[String]),
-          seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
-          dbTimestamp = row.get("db_timestamp", classOf[Instant]),
-          readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
-          payload = Some(row.get("event_payload", classOf[Array[Byte]])),
-          serId = row.get[Integer]("event_ser_id", classOf[Integer]),
-          serManifest = row.get("event_ser_manifest", classOf[String]),
-          writerUuid = row.get("writer", classOf[String]),
-          tags = Set.empty, // tags not fetched in queries (yet)
-          metadata = readMetadata(row)))
-
-    if (log.isDebugEnabled)
-      result.foreach(rows => log.debug("Read [{}] events for persistenceId 
[{}]", rows.size, persistenceId))
-
-    Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ => 
NotUsed)
-  }
-
   def persistenceIds(afterId: Option[String], limit: Long): Source[String, 
NotUsed] = {
     val result = r2dbcExecutor.select(s"select persistenceIds")(
       connection =>
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 4280f11..f721f84 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -37,12 +37,10 @@ import 
pekko.persistence.query.typed.scaladsl.EventTimestampQuery
 import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
 import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.ContinuousQuery
 import pekko.persistence.r2dbc.internal.PubSub
-import pekko.persistence.r2dbc.journal.JournalDao
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
 import pekko.persistence.typed.PersistenceId
 import pekko.serialization.SerializationExtension
@@ -73,16 +71,14 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   import R2dbcReadJournal.PersistenceIdsQueryState
 
   private val log = LoggerFactory.getLogger(getClass)
-  private val sharedConfigPath = cfgPath.replaceAll("""\.query$""", "")
-  private val settings = 
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
+  private val settings = QuerySettings(config)
 
   private implicit val typedSystem: ActorSystem[_] = system.toTyped
   import typedSystem.executionContext
   private val serialization = SerializationExtension(system)
   private val persistenceExt = Persistence(system)
-  private val connectionFactory = ConnectionFactoryProvider(typedSystem)
-    .connectionFactoryFor(sharedConfigPath + ".connection-factory")
-  private val queryDao = QueryDao.fromConfig(settings, sharedConfigPath)
+
+  private val queryDao = QueryDao.fromConfig(settings, config)
 
   private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]] 
= {
     val createEnvelope: (TimestampOffset, SerializedJournalRow) => 
EventEnvelope[Any] = (offset, row) => {
@@ -107,8 +103,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
   private def bySlice[Event]: BySliceQuery[SerializedJournalRow, 
EventEnvelope[Event]] =
     _bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow, 
EventEnvelope[Event]]]
 
-  private val journalDao = JournalDao.fromConfig(settings, sharedConfigPath)
-
   def extractEntityTypeFromPersistenceId(persistenceId: String): String =
     PersistenceId.extractEntityType(persistenceId)
 
@@ -169,7 +163,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
           .actorRef[EventEnvelope[Event]](
             completionMatcher = PartialFunction.empty,
             failureMatcher = PartialFunction.empty,
-            bufferSize = settings.querySettings.bufferSize,
+            bufferSize = settings.bufferSize,
             overflowStrategy = OverflowStrategy.dropNew)
           .mapMaterializedValue { ref =>
             (minSlice to maxSlice).foreach { slice =>
@@ -177,7 +171,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
               pubSub.eventTopic(entityType, slice) ! 
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
             }
           }
-      
dbSource.merge(pubSubSource).via(deduplicate(settings.querySettings.deduplicateCapacity))
+      
dbSource.merge(pubSubSource).via(deduplicate(settings.deduplicateCapacity))
     } else
       dbSource
   }
@@ -227,72 +221,19 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       fromSequenceNr: Long,
       toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
     val highestSeqNrFut =
-      if (toSequenceNr == Long.MaxValue) 
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
+      if (toSequenceNr == Long.MaxValue) 
queryDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
       else Future.successful(toSequenceNr)
 
     Source
       .futureSource[SerializedJournalRow, NotUsed] {
         highestSeqNrFut.map { highestSeqNr =>
-          internalEventsByPersistenceId(persistenceId, fromSequenceNr, 
highestSeqNr)
+          queryDao.internalEventsByPersistenceId(persistenceId, 
fromSequenceNr, highestSeqNr)
         }
       }
       .map(deserializeRow)
       .mapMaterializedValue(_ => NotUsed)
   }
 
-  /**
-   * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
-   */
-  @InternalApi private[r2dbc] def internalEventsByPersistenceId(
-      persistenceId: String,
-      fromSequenceNr: Long,
-      toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
-    def updateState(state: ByPersistenceIdState, row: SerializedJournalRow): 
ByPersistenceIdState =
-      state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
-
-    def nextQuery(
-        state: ByPersistenceIdState,
-        highestSeqNr: Long): (ByPersistenceIdState, 
Option[Source[SerializedJournalRow, NotUsed]]) = {
-      if (state.queryCount == 0L || state.rowCount >= 
settings.querySettings.bufferSize) {
-        val newState = state.copy(rowCount = 0, queryCount = state.queryCount 
+ 1)
-
-        if (state.queryCount != 0 && log.isDebugEnabled())
-          log.debug(
-            "currentEventsByPersistenceId query [{}] for persistenceId [{}], 
from [{}] to [{}]. Found [{}] rows in previous query.",
-            state.queryCount: java.lang.Integer,
-            persistenceId,
-            state.latestSeqNr + 1: java.lang.Long,
-            highestSeqNr: java.lang.Long,
-            state.rowCount: java.lang.Integer)
-
-        newState -> Some(
-          queryDao
-            .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1, 
highestSeqNr))
-      } else {
-        log.debug(
-          "currentEventsByPersistenceId query [{}] for persistenceId [{}] 
completed. Found [{}] rows in previous query.",
-          state.queryCount: java.lang.Integer,
-          persistenceId,
-          state.rowCount: java.lang.Integer)
-
-        state -> None
-      }
-    }
-
-    if (log.isDebugEnabled())
-      log.debug(
-        "currentEventsByPersistenceId query for persistenceId [{}], from [{}] 
to [{}].",
-        persistenceId,
-        fromSequenceNr: java.lang.Long,
-        toSequenceNr: java.lang.Long)
-
-    ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
-      initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr - 
1),
-      updateState = updateState,
-      delayNextQuery = _ => None,
-      nextQuery = state => nextQuery(state, toSequenceNr))
-  }
-
   // EventTimestampQuery
   override def timestampOf(persistenceId: String, sequenceNr: Long): 
Future[Option[Instant]] = {
     queryDao.timestampOfEvent(persistenceId, sequenceNr)
@@ -323,8 +264,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     def delayNextQuery(state: ByPersistenceIdState): Option[FiniteDuration] = {
       val delay = ContinuousQuery.adjustNextDelay(
         state.rowCount,
-        settings.querySettings.bufferSize,
-        settings.querySettings.refreshInterval)
+        settings.bufferSize,
+        settings.refreshInterval)
 
       delay.foreach { d =>
         log.debug(
@@ -403,7 +344,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
     queryDao.persistenceIds(afterId, limit)
 
   override def currentPersistenceIds(): Source[String, NotUsed] = {
-    import settings.querySettings.persistenceIdsBufferSize
+    import settings.persistenceIdsBufferSize
     def updateState(state: PersistenceIdsQueryState, pid: String): 
PersistenceIdsQueryState =
       state.copy(rowCount = state.rowCount + 1, latestPid = pid)
 
@@ -439,5 +380,4 @@ final class R2dbcReadJournal(system: ExtendedActorSystem, 
config: Config, cfgPat
       nextQuery = state => nextQuery(state))
       .mapMaterializedValue(_ => NotUsed)
   }
-
 }
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index 8cbc5b6..dc04dda 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -25,22 +25,22 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.QueryDao
+import io.r2dbc.spi.ConnectionFactory
 
 /**
  * INTERNAL API
  */
 @InternalApi
 private[r2dbc] class MySQLQueryDao(
-    journalSettings: R2dbcSettings,
+    querySettings: QuerySettings,
     connectionFactory: ConnectionFactory
-)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
QueryDao(journalSettings, connectionFactory) {
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
QueryDao(querySettings, connectionFactory) {
 
   override lazy val statementTimestampSql: String = "NOW(6)"
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
index 7d637ad..a030edc 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
@@ -13,20 +13,23 @@
 
 package org.apache.pekko.persistence.r2dbc.snapshot
 
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.actor.typed.scaladsl.adapter._
-import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata, 
SnapshotSelectionCriteria }
-import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.snapshot.SnapshotStore
-import pekko.serialization.{ Serialization, SerializationExtension }
-import com.typesafe.config.Config
-import scala.concurrent.{ ExecutionContext, Future }
-
 import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.SnapshotSettings
 import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotMetadata
 import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotRow
+import pekko.persistence.snapshot.SnapshotStore
+import pekko.persistence.SelectedSnapshot
+import pekko.persistence.SnapshotMetadata
+import pekko.persistence.SnapshotSelectionCriteria
 import pekko.serialization.Serializers
+import pekko.serialization.Serialization
+import pekko.serialization.SerializationExtension
+import com.typesafe.config.Config
 
 object R2dbcSnapshotStore {
   private def deserializeSnapshotRow(snap: SerializedSnapshotRow, 
serialization: Serialization): SelectedSnapshot =
@@ -57,9 +60,8 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config, 
cfgPath: String) exte
   private implicit val system: ActorSystem[_] = context.system.toTyped
 
   private val dao = {
-    val sharedConfigPath = cfgPath.replaceAll("""\.snapshot$""", "")
-    val settings = 
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
-    SnapshotDao.fromConfig(settings, sharedConfigPath)
+    val settings = SnapshotSettings(cfg)
+    SnapshotDao.fromConfig(settings, cfg)
   }
 
   def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria): 
Future[Option[SelectedSnapshot]] =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 01f3395..bc3bb94 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -15,8 +15,6 @@ package org.apache.pekko.persistence.r2dbc.snapshot
 
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
@@ -25,11 +23,14 @@ import pekko.persistence.Persistence
 import pekko.persistence.SnapshotSelectionCriteria
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.SnapshotSettings
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
 import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -69,16 +70,16 @@ private[r2dbc] object SnapshotDao {
       })
 
   def fromConfig(
-      journalSettings: R2dbcSettings,
-      sharedConfigPath: String
+      settings: SnapshotSettings,
+      config: Config
   )(implicit system: ActorSystem[_], ec: ExecutionContext): SnapshotDao = {
     val connectionFactory =
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
-    journalSettings.dialect match {
+      
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
 config)
+    settings.dialect match {
       case Dialect.Postgres | Dialect.Yugabyte =>
-        new SnapshotDao(journalSettings, connectionFactory)
+        new SnapshotDao(settings, connectionFactory)
       case Dialect.MySQL =>
-        new MySQLSnapshotDao(journalSettings, connectionFactory)
+        new MySQLSnapshotDao(settings, connectionFactory)
     }
   }
 }
@@ -89,7 +90,7 @@ private[r2dbc] object SnapshotDao {
  * Class for doing db interaction outside of an actor to avoid mistakes in 
future callbacks
  */
 @InternalApi
-private[r2dbc] class SnapshotDao(settings: R2dbcSettings, connectionFactory: 
ConnectionFactory)(
+private[r2dbc] class SnapshotDao(settings: SnapshotSettings, 
connectionFactory: ConnectionFactory)(
     implicit
     ec: ExecutionContext,
     system: ActorSystem[_]) {
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
index e725168..6baa0cf 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
@@ -24,7 +24,7 @@ import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.SnapshotSettings
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.snapshot.SnapshotDao
 
@@ -33,7 +33,7 @@ import pekko.persistence.r2dbc.snapshot.SnapshotDao
  */
 @InternalApi
 private[r2dbc] class MySQLSnapshotDao(
-    settings: R2dbcSettings, connectionFactory: ConnectionFactory
+    settings: SnapshotSettings, connectionFactory: ConnectionFactory
 )(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
SnapshotDao(settings, connectionFactory) {
 
   override val upsertSql = sql"""
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 7c9d2c1..2c5d454 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -19,9 +19,6 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.R2dbcDataIntegrityViolationException
-import io.r2dbc.spi.Statement
 import org.apache.pekko
 import pekko.Done
 import pekko.NotUsed
@@ -31,7 +28,7 @@ import pekko.dispatch.ExecutionContexts
 import pekko.persistence.Persistence
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
 import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
@@ -40,6 +37,10 @@ import 
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
 import pekko.persistence.typed.PersistenceId
 import pekko.stream.scaladsl.Source
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.R2dbcDataIntegrityViolationException
+import io.r2dbc.spi.Statement
 import org.slf4j.Logger
 import org.slf4j.LoggerFactory
 
@@ -64,16 +65,16 @@ import org.slf4j.LoggerFactory
   }
 
   def fromConfig(
-      journalSettings: R2dbcSettings,
-      sharedConfigPath: String
+      settings: StateSettings,
+      config: Config
   )(implicit system: ActorSystem[_], ec: ExecutionContext): DurableStateDao = {
     val connectionFactory =
-      ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath 
+ ".connection-factory")
-    journalSettings.dialect match {
+      
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
 config)
+    settings.dialect match {
       case Dialect.Postgres | Dialect.Yugabyte =>
-        new DurableStateDao(journalSettings, connectionFactory)
+        new DurableStateDao(settings, connectionFactory)
       case Dialect.MySQL =>
-        new MySQLDurableStateDao(journalSettings, connectionFactory)
+        new MySQLDurableStateDao(settings, connectionFactory)
     }
   }
 }
@@ -84,7 +85,7 @@ import org.slf4j.LoggerFactory
  * Class for encapsulating db interaction.
  */
 @InternalApi
-private[r2dbc] class DurableStateDao(settings: R2dbcSettings, 
connectionFactory: ConnectionFactory)(
+private[r2dbc] class DurableStateDao(settings: StateSettings, 
connectionFactory: ConnectionFactory)(
     implicit
     ec: ExecutionContext,
     system: ActorSystem[_])
@@ -360,9 +361,9 @@ private[r2dbc] class DurableStateDao(settings: 
R2dbcSettings, connectionFactory:
         toTimestamp match {
           case Some(until) =>
             stmt.bind(2, until)
-            stmt.bind(3, settings.querySettings.bufferSize)
+            stmt.bind(3, settings.bufferSize)
           case None =>
-            stmt.bind(2, settings.querySettings.bufferSize)
+            stmt.bind(2, settings.bufferSize)
         }
         stmt
       },
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index 4d7de65..ab30892 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -31,7 +31,7 @@ import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.UpdatedDurableState
 import 
pekko.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
 import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
 import pekko.persistence.r2dbc.internal.BySliceQuery
 import pekko.persistence.r2dbc.internal.ContinuousQuery
 import 
pekko.persistence.r2dbc.state.scaladsl.DurableStateDao.SerializedStateRow
@@ -55,14 +55,14 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
   import R2dbcDurableStateStore.PersistenceIdsQueryState
 
   private val log = LoggerFactory.getLogger(getClass)
-  private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "")
-  private val settings = 
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
+  private val settings = StateSettings(config)
 
   private implicit val typedSystem: ActorSystem[_] = system.toTyped
   implicit val ec: ExecutionContext = system.dispatcher
   private val serialization = SerializationExtension(system)
   private val persistenceExt = Persistence(system)
-  private val stateDao = DurableStateDao.fromConfig(settings, sharedConfigPath)
+
+  private val stateDao = DurableStateDao.fromConfig(settings, config)
 
   private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]] 
= {
     val createEnvelope: (TimestampOffset, SerializedStateRow) => 
DurableStateChange[A] = (offset, row) => {
@@ -152,7 +152,7 @@ class R2dbcDurableStateStore[A](system: 
ExtendedActorSystem, config: Config, cfg
     stateDao.persistenceIds(afterId, limit)
 
   def currentPersistenceIds(): Source[String, NotUsed] = {
-    import settings.querySettings.persistenceIdsBufferSize
+    import settings.persistenceIdsBufferSize
     def updateState(state: PersistenceIdsQueryState, pid: String): 
PersistenceIdsQueryState =
       state.copy(rowCount = state.rowCount + 1, latestPid = pid)
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
index 385dc21..bcd7056 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -25,21 +25,21 @@ import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration.Duration
 import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
 import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+import io.r2dbc.spi.ConnectionFactory
 
 /**
  * INTERNAL API
  */
 @InternalApi
 private[r2dbc] class MySQLDurableStateDao(
-    settings: R2dbcSettings,
+    settings: StateSettings,
     connectionFactory: ConnectionFactory
 )(implicit ec: ExecutionContext, system: ActorSystem[_]) extends 
DurableStateDao(settings, connectionFactory) {
   MySQLJournalDao.settingRequirements(settings)
diff --git a/core/src/main/resources/reference.conf 
b/core/src/test/resources/config-v1.conf
similarity index 100%
copy from core/src/main/resources/reference.conf
copy to core/src/test/resources/config-v1.conf
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
index 324818e..c3257e0 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
@@ -13,25 +13,33 @@
 
 package org.apache.pekko.persistence.r2dbc
 
+import com.typesafe.config.ConfigException
 import com.typesafe.config.ConfigFactory
 import io.r2dbc.postgresql.client.SSLMode
+import org.scalatest.Inspectors._
 import org.scalatest.TestSuite
 import org.scalatest.matchers.should.Matchers
 import org.scalatest.wordspec.AnyWordSpec
 
 class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers {
 
+  private lazy val configV1 = ConfigFactory.parseResources("config-v1.conf")
+
   "Settings" should {
     "have table names with schema" in {
-      val config = 
ConfigFactory.parseString("pekko.persistence.r2dbc.schema=s1").withFallback(ConfigFactory.load())
-      val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
-      settings.journalTableWithSchema shouldBe "s1.event_journal"
-      settings.snapshotsTableWithSchema shouldBe "s1.snapshot"
-      settings.durableStateTableWithSchema shouldBe "s1.durable_state"
+      val config = 
ConfigFactory.load(ConfigFactory.parseString("pekko.persistence.r2dbc.schema=s1"))
+      val journalSettings = 
JournalSettings(config.getConfig("pekko.persistence.r2dbc.journal"))
+      journalSettings.journalTableWithSchema shouldBe "s1.event_journal"
+      val snapshotSettings = 
SnapshotSettings(config.getConfig("pekko.persistence.r2dbc.snapshot"))
+      snapshotSettings.snapshotsTableWithSchema shouldBe "s1.snapshot"
+      val stateSettings = 
StateSettings(config.getConfig("pekko.persistence.r2dbc.state"))
+      stateSettings.durableStateTableWithSchema shouldBe "s1.durable_state"
 
       // by default connection is configured with options
-      settings.connectionFactorySettings shouldBe a[ConnectionFactorySettings]
-      settings.connectionFactorySettings.urlOption should not be defined
+      val connectionFactorySettings =
+        
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+      connectionFactorySettings shouldBe a[ConnectionFactorySettings]
+      connectionFactorySettings.urlOption should not be defined
     }
 
     "support connection settings build from url" in {
@@ -40,35 +48,55 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite 
with Matchers {
           
.parseString("pekko.persistence.r2dbc.connection-factory.url=whatever-url")
           .withFallback(ConfigFactory.load())
 
-      val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
-      settings.connectionFactorySettings shouldBe a[ConnectionFactorySettings]
-      settings.connectionFactorySettings.urlOption shouldBe defined
+      val settings = 
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+      settings shouldBe a[ConnectionFactorySettings]
+      settings.urlOption shouldBe defined
     }
 
     "support ssl-mode as enum name" in {
       val config = ConfigFactory
         
.parseString("pekko.persistence.r2dbc.connection-factory.ssl.mode=VERIFY_FULL")
         .withFallback(ConfigFactory.load())
-      val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
-      settings.connectionFactorySettings.sslMode shouldBe "VERIFY_FULL"
-      SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe 
SSLMode.VERIFY_FULL
+      val settings = 
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+      settings.sslMode shouldBe "VERIFY_FULL"
+      SSLMode.fromValue(settings.sslMode) shouldBe SSLMode.VERIFY_FULL
     }
 
     "support ssl-mode values in lower and dashes" in {
       val config = ConfigFactory
         
.parseString("pekko.persistence.r2dbc.connection-factory.ssl.mode=verify-full")
         .withFallback(ConfigFactory.load())
-      val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
-      settings.connectionFactorySettings.sslMode shouldBe "verify-full"
-      SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe 
SSLMode.VERIFY_FULL
+      val settings = 
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+      settings.sslMode shouldBe "verify-full"
+      SSLMode.fromValue(settings.sslMode) shouldBe SSLMode.VERIFY_FULL
     }
 
     "allow to specify ConnectionFactoryOptions customizer" in {
       val config = ConfigFactory
         
.parseString("pekko.persistence.r2dbc.connection-factory.connection-factory-options-customizer=fqcn")
         .withFallback(ConfigFactory.load())
-      val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
-      settings.connectionFactorySettings.connectionFactoryOptionsCustomizer 
shouldBe Some("fqcn")
+      val settings = 
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+      settings.connectionFactoryOptionsCustomizer shouldBe Some("fqcn")
+    }
+
+    "not work when not merged with reference config for plugin config with v1 
config" in {
+      
assertThrows[ConfigException.Missing](JournalSettings(configV1.getConfig("pekko.persistence.r2dbc.journal")))
+      
assertThrows[ConfigException.Missing](SnapshotSettings(configV1.getConfig("pekko.persistence.r2dbc.snapshot")))
+      
assertThrows[ConfigException.Missing](StateSettings(configV1.getConfig("pekko.persistence.r2dbc.state")))
+    }
+
+    "work when merged with reference config for plugin config with v1 config" 
in {
+      val configWithReference = ConfigFactory.load(configV1)
+
+      forEvery(
+        List(
+          
JournalSettings(configWithReference.getConfig("pekko.persistence.r2dbc.journal")),
+          
SnapshotSettings(configWithReference.getConfig("pekko.persistence.r2dbc.snapshot")),
+          
StateSettings(configWithReference.getConfig("pekko.persistence.r2dbc.state"))
+        )
+      ) { settings =>
+        settings.useConnectionFactory shouldBe 
"pekko.persistence.r2dbc.connection-factory"
+      }
     }
   }
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index 161262e..08789e5 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -17,7 +17,7 @@ import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 
 object TestConfig {
-  lazy val config: Config = {
+  lazy val unresolvedConfig: Config = {
     val defaultConfig = ConfigFactory.load()
     val dialect = defaultConfig.getString("pekko.persistence.r2dbc.dialect")
 
@@ -46,7 +46,7 @@ object TestConfig {
           """)
       case "mysql" =>
         ConfigFactory.parseString("""
-          pekko.persistence.r2dbc{
+          pekko.persistence.r2dbc {
             connection-factory {
               driver = "mysql"
               host = "localhost"
@@ -61,26 +61,23 @@ object TestConfig {
           """)
     }
 
-    // using load here so that connection-factory can be overridden
-    ConfigFactory.load(dialectConfig.withFallback(ConfigFactory.parseString("""
+    dialectConfig.withFallback(ConfigFactory.parseString("""
     pekko.loglevel = DEBUG
     pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
     pekko.persistence.snapshot-store.plugin = 
"pekko.persistence.r2dbc.snapshot"
     pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
-    pekko.persistence.r2dbc {
-      query {
-        refresh-interval = 1s
-      }
-    }
+    pekko.persistence.r2dbc.refresh-interval = 1s
     pekko.actor {
       serialization-bindings {
         "org.apache.pekko.persistence.r2dbc.CborSerializable" = jackson-cbor
       }
     }
     pekko.actor.testkit.typed.default-timeout = 10s
-    """)))
+    """))
   }
 
+  lazy val config: Config = ConfigFactory.load(unresolvedConfig)
+
   val backtrackingDisabledConfig: Config =
-    
ConfigFactory.parseString("pekko.persistence.r2dbc.query.backtracking.enabled = 
off")
+    ConfigFactory.parseString("pekko.persistence.r2dbc.backtracking.enabled = 
off")
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
index 3055261..5adfee8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
@@ -15,11 +15,12 @@ package org.apache.pekko.persistence.r2dbc
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.Persistence
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.Suite
 import org.slf4j.LoggerFactory
@@ -30,32 +31,42 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: 
Suite =>
 
   def testConfigPath: String = "pekko.persistence.r2dbc"
 
-  lazy val r2dbcSettings: R2dbcSettings =
-    new R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath))
+  private lazy val config: Config = typedSystem.settings.config
+
+  lazy val journalSettings: JournalSettings = new 
JournalSettings(config.getConfig(testConfigPath + ".journal"))
+
+  lazy val snapshotSettings: SnapshotSettings = new 
SnapshotSettings(config.getConfig(testConfigPath + ".snapshot"))
+
+  lazy val stateSettings: StateSettings = new 
StateSettings(config.getConfig(testConfigPath + ".state"))
 
-  lazy val r2dbcExecutor: R2dbcExecutor = {
+  // making sure that test harness does not initialize connection factory for 
the plugin that is being tested
+  lazy val connectionFactoryProvider: ConnectionFactory =
+    ConnectionFactoryProvider(typedSystem)
+      .connectionFactoryFor("test.connection-factory",
+        
config.getConfig("pekko.persistence.r2dbc.connection-factory").atPath("test.connection-factory"))
+
+  // this assumes that journal, snapshot store and state use same connection 
settings
+  lazy val r2dbcExecutor: R2dbcExecutor =
     new R2dbcExecutor(
-      
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(testConfigPath + 
".connection-factory"),
+      connectionFactoryProvider,
       LoggerFactory.getLogger(getClass),
-      r2dbcSettings.logDbCallsExceeding)(typedSystem.executionContext, 
typedSystem)
-  }
+      journalSettings.logDbCallsExceeding)(typedSystem.executionContext, 
typedSystem)
 
   lazy val persistenceExt: Persistence = Persistence(typedSystem)
 
   override protected def beforeAll(): Unit = {
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(
-        _.createStatement(s"delete from 
${r2dbcSettings.journalTableWithSchema}")),
+        _.createStatement(s"delete from 
${journalSettings.journalTableWithSchema}")),
       10.seconds)
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(
-        _.createStatement(s"delete from 
${r2dbcSettings.snapshotsTableWithSchema}")),
+        _.createStatement(s"delete from 
${snapshotSettings.snapshotsTableWithSchema}")),
       10.seconds)
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(
-        _.createStatement(s"delete from 
${r2dbcSettings.durableStateTableWithSchema}")),
+        _.createStatement(s"delete from 
${stateSettings.durableStateTableWithSchema}")),
       10.seconds)
     super.beforeAll()
   }
-
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index d0c6fcc..20427d2 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -14,13 +14,12 @@
 package org.apache.pekko.persistence.r2dbc.journal
 
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
@@ -36,11 +35,11 @@ class PersistTagsSpec
     with LogCapturing {
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+  private val settings = 
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
 
   case class Row(pid: String, seqNr: Long, tags: Set[String])
 
-  private lazy val dialect = 
system.settings.config.getString("pekko.persistence.r2dbc.dialect")
+  private lazy val dialect = 
system.settings.config.getString("pekko.persistence.r2dbc.journal.dialect")
 
   private lazy val testEnabled: Boolean = {
     // tags are not implemented for MySQL
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index b021abb..5b29f6d 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -16,13 +16,12 @@ package org.apache.pekko.persistence.r2dbc.journal
 import java.time.Instant
 
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
 import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
 import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
@@ -39,7 +38,7 @@ class PersistTimestampSpec
     with LogCapturing {
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+  private val settings = 
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
   private val serialization = SerializationExtension(system)
 
   case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
new file mode 100644
index 0000000..ebef1eb
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal
+
+import scala.collection.immutable.ListSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.adapter._
+import pekko.persistence.JournalProtocol.RecoverySuccess
+import pekko.persistence.JournalProtocol.ReplayMessages
+import pekko.persistence.JournalProtocol.ReplayedMessage
+import pekko.persistence.Persistence
+import pekko.persistence.SelectedSnapshot
+import pekko.persistence.SnapshotProtocol.LoadSnapshot
+import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
+import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.StateSettings
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
+import pekko.persistence.state.scaladsl.GetObjectResult
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import pekko.persistence.typed.scaladsl.RetentionCriteria
+import pekko.stream.scaladsl.Sink
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Inside
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+object RuntimePluginConfigSpec {
+
+  trait EventSourced {
+    import EventSourced._
+
+    def configKey: String
+    def database: String
+
+    lazy val config: Config =
+      ConfigFactory
+        .load(
+          ConfigFactory
+            .parseString(
+              s"""
+              $configKey = $${pekko.persistence.r2dbc}
+              $configKey = {
+                connection-factory {
+                  database = "$database"
+                }
+
+                journal.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                journal.use-connection-factory = 
"$configKey.connection-factory"
+                query.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                query.use-connection-factory = "$configKey.connection-factory"
+                snapshot.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                snapshot.use-connection-factory = 
"$configKey.connection-factory"
+              }
+              """
+            )
+            .withFallback(TestConfig.unresolvedConfig)
+        )
+
+    def apply(persistenceId: String): Behavior[Command] =
+      EventSourcedBehavior[Command, String, String](
+        PersistenceId.ofUniqueId(persistenceId),
+        "",
+        (state, cmd) =>
+          cmd match {
+            case Save(text, replyTo) =>
+              Effect.persist(text).thenRun(_ => replyTo ! Done)
+            case ShowMeWhatYouGot(replyTo) =>
+              replyTo ! state
+              Effect.none
+            case Stop =>
+              Effect.stop()
+          },
+        (state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
+        .withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
+        .withJournalPluginId(s"$configKey.journal")
+        .withJournalPluginConfig(Some(config))
+        .withSnapshotPluginId(s"$configKey.snapshot")
+        .withSnapshotPluginConfig(Some(config))
+  }
+  object EventSourced {
+    sealed trait Command
+    case class Save(text: String, replyTo: ActorRef[Done]) extends Command
+    case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
+    case object Stop extends Command
+  }
+
+  trait DurableState {
+    def typedSystem: ActorSystem[_]
+    def configKey: String
+    def database: String
+
+    lazy val config: Config =
+      ConfigFactory
+        .load(
+          ConfigFactory
+            .parseString(
+              s"""
+              $configKey = $${pekko.persistence.r2dbc}
+              $configKey = {
+                connection-factory {
+                  database = "$database"
+                }
+
+                state.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                state.use-connection-factory = "$configKey.connection-factory"
+              }
+              """
+            )
+            .withFallback(TestConfig.unresolvedConfig)
+        )
+
+    val store = new R2dbcDurableStateStore[String](
+      typedSystem.toClassic.asInstanceOf[ExtendedActorSystem],
+      config.getConfig(s"$configKey.state"),
+      ""
+    )
+  }
+}
+
+class RuntimePluginConfigSpec
+    extends ScalaTestWithActorTestKit(TestConfig.config)
+    with AnyWordSpecLike
+    with BeforeAndAfterEach
+    with LogCapturing
+    with Inside {
+  import RuntimePluginConfigSpec._
+
+  private lazy val eventSourced1 = new EventSourced {
+    override def configKey: String = "plugin1"
+    override def database: String = "database1"
+  }
+  private lazy val eventSourced2 = new EventSourced {
+    override def configKey: String = "plugin2"
+    override def database: String = "database2"
+  }
+
+  private lazy val state1 = new DurableState {
+    override def typedSystem: ActorSystem[_] = system
+    override def configKey: String = "plugin1"
+    override def database: String = "database1"
+  }
+  private lazy val state2 = new DurableState {
+    override def typedSystem: ActorSystem[_] = system
+    override def configKey: String = "plugin2"
+    override def database: String = "database2"
+  }
+
+  override protected def beforeEach(): Unit = {
+    super.beforeAll()
+
+    ListSet(eventSourced1, eventSourced2).foreach { eventSourced =>
+      val journalConfig = 
eventSourced.config.getConfig(s"${eventSourced.configKey}.journal")
+      val journalSettings: JournalSettings = JournalSettings(journalConfig)
+
+      val snapshotSettings: SnapshotSettings =
+        
SnapshotSettings(eventSourced.config.getConfig(s"${eventSourced.configKey}.snapshot"))
+
+      // making sure that test harness does not initialize connection factory 
for the plugin that is being tested
+      val connectionFactoryProvider =
+        ConnectionFactoryProvider(system)
+          
.connectionFactoryFor(s"test.${eventSourced.configKey}.connection-factory",
+            
journalConfig.getConfig(journalSettings.useConnectionFactory).atPath(
+              s"test.${eventSourced.configKey}.connection-factory"))
+
+      // this assumes that journal, snapshot store and state use same 
connection settings
+      val r2dbcExecutor: R2dbcExecutor =
+        new R2dbcExecutor(
+          connectionFactoryProvider,
+          LoggerFactory.getLogger(getClass),
+          journalSettings.logDbCallsExceeding)(system.executionContext, system)
+
+      Await.result(
+        r2dbcExecutor.updateOne("beforeAll delete")(
+          _.createStatement(s"delete from 
${journalSettings.journalTableWithSchema}")),
+        10.seconds)
+      Await.result(
+        r2dbcExecutor.updateOne("beforeAll delete")(
+          _.createStatement(s"delete from 
${snapshotSettings.snapshotsTableWithSchema}")),
+        10.seconds)
+    }
+
+    ListSet(state1, state2).foreach { state =>
+      val stateConfig = state.config.getConfig(s"${state.configKey}.state")
+      val stateSettings: StateSettings = StateSettings(stateConfig)
+
+      // making sure that test harness does not initialize connection factory 
for the plugin that is being tested
+      val connectionFactoryProvider =
+        ConnectionFactoryProvider(system)
+          .connectionFactoryFor(s"test.${state.configKey}.connection-factory",
+            state.config.getConfig(stateSettings.useConnectionFactory).atPath(
+              s"test.${state.configKey}.connection-factory"))
+
+      val r2dbcExecutor: R2dbcExecutor =
+        new R2dbcExecutor(
+          connectionFactoryProvider,
+          LoggerFactory.getLogger(getClass),
+          stateSettings.logDbCallsExceeding)(system.executionContext, system)
+
+      Await.result(
+        r2dbcExecutor.updateOne("beforeAll delete")(
+          _.createStatement(s"delete from 
${stateSettings.durableStateTableWithSchema}")),
+        10.seconds)
+    }
+  }
+
+  "Runtime plugin config" should {
+    "work for journal, query and snapshot store plugins" in {
+      val probe = createTestProbe[Any]()
+
+      {
+        // one actor in each journal with same id
+        val j1 = spawn(eventSourced1("id1"))
+        val j2 = spawn(eventSourced2("id1"))
+        j1 ! EventSourced.Save("j1m1", probe.ref)
+        probe.receiveMessage()
+        j2 ! EventSourced.Save("j2m1", probe.ref)
+        probe.receiveMessage()
+      }
+
+      {
+        def assertJournal(eventSourced: EventSourced, expectedEvent: String) = 
{
+          val ref = 
Persistence(system).journalFor(s"${eventSourced.configKey}.journal", 
eventSourced.config)
+          ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1", 
probe.ref.toClassic), probe.ref.toClassic)
+          inside(probe.receiveMessage()) {
+            case ReplayedMessage(persistentRepr) =>
+              persistentRepr.persistenceId shouldBe "id1"
+              persistentRepr.payload shouldBe expectedEvent
+          }
+          probe.expectMessage(RecoverySuccess(1))
+        }
+
+        assertJournal(eventSourced1, "j1m1")
+        assertJournal(eventSourced2, "j2m1")
+      }
+
+      {
+        def assertQuery(eventSourced: EventSourced, expectedEvent: String) = {
+          val readJournal =
+            
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](s"${eventSourced.configKey}.query",
+              eventSourced.config)
+          val events = readJournal.currentEventsByPersistenceId("id1", 0, 
Long.MaxValue)
+            .map(_.event)
+            .runWith(Sink.seq).futureValue
+          events should contain theSameElementsAs Seq(expectedEvent)
+        }
+
+        assertQuery(eventSourced1, "j1m1")
+        assertQuery(eventSourced2, "j2m1")
+      }
+
+      {
+        def assertSnapshot(eventSourced: EventSourced, expectedShapshot: 
String) = {
+          val ref = 
Persistence(system).snapshotStoreFor(s"${eventSourced.configKey}.snapshot", 
eventSourced.config)
+          ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest, 
Long.MaxValue),
+            probe.ref.toClassic)
+          inside(probe.receiveMessage()) {
+            case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
+              snapshot shouldBe expectedShapshot
+          }
+        }
+
+        assertSnapshot(eventSourced1, "j1m1")
+        assertSnapshot(eventSourced2, "j2m1")
+      }
+    }
+
+    "work for durable state plugin" in {
+      // persist data on both plugins
+      state1.store.upsertObject("id1", 1, "j1m1", "").futureValue
+      state2.store.upsertObject("id1", 1, "j2m1", "").futureValue
+
+      def assertState(state: DurableState, expectedState: String) = {
+        inside(state.store.getObject("id1").futureValue) {
+          case GetObjectResult(Some(value), revision) =>
+            value shouldBe expectedState
+            revision shouldBe 1
+        }
+      }
+
+      assertState(state1, "j1m1")
+      assertState(state2, "j2m1")
+    }
+  }
+}
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index d08d2da..bbb2c9f 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -22,7 +22,6 @@ import pekko.actor.typed.ActorSystem
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
-import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.r2dbc.TestActors
 import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
@@ -52,7 +51,6 @@ class EventsByPersistenceIdSpec
   import EventsByPersistenceIdSpec._
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
 
   private val query = 
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index d980798..385252e 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -24,7 +24,7 @@ import pekko.persistence.query.NoOffset
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
 import pekko.persistence.r2dbc.TestConfig
 import pekko.persistence.r2dbc.TestData
 import pekko.persistence.r2dbc.TestDbLifecycle
@@ -45,7 +45,7 @@ class EventsBySliceBacktrackingSpec
     with LogCapturing {
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+  private val settings = 
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
 
   private val query = PersistenceQuery(testKit.system)
     .readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
@@ -135,14 +135,14 @@ class EventsBySliceBacktrackingSpec
       writeEvent(slice2, pid2, 1L, startTime.plusMillis(2), "e2-1")
 
       // no backtracking yet
-      result.expectNoMessage(settings.querySettings.refreshInterval + 
100.millis)
+      result.expectNoMessage(settings.refreshInterval + 100.millis)
 
       // after 1/2 of the backtracking widow, to kick off a backtracking query
       writeEvent(
         slice1,
         pid1,
         4L,
-        
startTime.plusMillis(settings.querySettings.backtrackingWindow.toMillis / 
2).plusMillis(4),
+        startTime.plusMillis(settings.backtrackingWindow.toMillis / 
2).plusMillis(4),
         "e1-4")
       val env6 = result.expectNext()
       env6.persistenceId shouldBe pid1
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index bc2bcfc..125a2f8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -44,15 +44,18 @@ import com.typesafe.config.ConfigFactory
 import org.scalatest.wordspec.AnyWordSpecLike
 
 object EventsBySlicePubSubSpec {
-  def config: Config = ConfigFactory
-    .parseString("""
+  def config: Config =
+    ConfigFactory.load(
+      ConfigFactory
+        .parseString("""
     pekko.persistence.r2dbc {
       journal.publish-events = on
       # no events from database query, only via pub-sub
-      query.behind-current-time = 5 minutes
+      behind-current-time = 5 minutes
     }
     """)
-    
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.config))
+        
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
+    )
 }
 
 class EventsBySlicePubSubSpec
@@ -99,6 +102,8 @@ class EventsBySlicePubSubSpec
   "EventsBySlices pub-sub" should {
 
     "publish new events" in new Setup {
+      
system.settings.config.getBoolean("pekko.persistence.r2dbc.journal.publish-events")
 shouldBe true
+      
system.settings.config.getBoolean("pekko.persistence.r2dbc.query.publish-events")
 shouldBe true
 
       val result: TestSubscriber.Probe[EventEnvelope[String]] =
         query.eventsBySlices[String](setupEntityType, slice, slice, 
NoOffset).runWith(sinkProbe).request(10)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index ba2fdd8..6d52110 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -29,7 +29,6 @@ import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
 import pekko.persistence.query.typed.scaladsl.LoadEventQuery
-import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.r2dbc.TestActors
 import pekko.persistence.r2dbc.TestActors.Persister
 import pekko.persistence.r2dbc.TestActors.Persister.Persist
@@ -83,7 +82,6 @@ class EventsBySliceSpec
   import EventsBySliceSpec._
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
 
   private val query = 
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
 
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index a344643..5c30435 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -29,7 +29,6 @@ import pekko.persistence.query.NoOffset
 import pekko.persistence.query.Offset
 import pekko.persistence.query.TimestampOffset
 import pekko.persistence.query.UpdatedDurableState
-import pekko.persistence.r2dbc.R2dbcSettings
 import pekko.persistence.r2dbc.TestActors
 import pekko.persistence.r2dbc.TestActors.DurableStatePersister.Persist
 import pekko.persistence.r2dbc.TestActors.DurableStatePersister.PersistWithAck
@@ -71,7 +70,6 @@ class DurableStateBySliceSpec
   import DurableStateBySliceSpec._
 
   override def typedSystem: ActorSystem[_] = system
-  private val settings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
 
   private val query = DurableStateStoreRegistry(testKit.system)
     
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 43fdf79..7fb9841 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -83,7 +83,7 @@ class DurableStateStoreSpec
     }
 
     "detect and reject concurrent updates" in {
-      if (!r2dbcSettings.durableStateAssertSingleWriter)
+      if (!stateSettings.durableStateAssertSingleWriter)
         pending
 
       val entityType = nextEntityType()
diff --git a/docs/src/main/paradox/connection-config.md 
b/docs/src/main/paradox/config.md
similarity index 53%
rename from docs/src/main/paradox/connection-config.md
rename to docs/src/main/paradox/config.md
index c96a6b6..b6da7ae 100644
--- a/docs/src/main/paradox/connection-config.md
+++ b/docs/src/main/paradox/config.md
@@ -1,4 +1,6 @@
-# Connection configuration
+# Configuration
+
+## Connection configuration
 
 Shared configuration for the connection pool is located under 
`pekko.persistence.r2dbc.connection-factory`.
 You have to set at least:
@@ -14,6 +16,14 @@ MySQL:
 
 ## Reference configuration 
 
-The following can be overridden in your `application.conf` for the connection 
settings:
+The following configuration can be overridden in your `application.conf`:
 
 @@snip [reference.conf](/core/src/main/resources/reference.conf) 
{#connection-settings}
+
+## Plugin configuration at runtime
+
+Plugin implementation supports plugin configuration at runtime.
+
+The following example demonstrates how the database to which the events and 
snapshots of an `EventSourcedBehavior` are stored can be set during runtime:
+
+@@snip 
[RuntimePluginConfigExample.scala](/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala)
 { #runtime-plugin-config }
diff --git a/docs/src/main/paradox/durable-state-store.md 
b/docs/src/main/paradox/durable-state-store.md
index 2e8945b..397413f 100644
--- a/docs/src/main/paradox/durable-state-store.md
+++ b/docs/src/main/paradox/durable-state-store.md
@@ -19,7 +19,7 @@ pekko.persistence.state.plugin = 
"pekko.persistence.r2dbc.state"
 It can also be enabled with the `durableStateStorePluginId` for a specific 
`DurableStateBehavior` and multiple
 plugin configurations are supported.
 
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
 
 ### Reference configuration
 
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 76947a3..599b986 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -8,7 +8,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL 
database with R2DBC as a
 
 * [overview](overview.md)
 * [Getting Started](getting-started.md)
-* [Getting Started](connection-config.md)
+* [Configuration](config.md)
 * [License Report](license-report.md)
 * [Journal Plugin](journal.md)
 * [Snapshot Plugin](snapshots.md)
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index cb33548..1b76ea8 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -23,7 +23,7 @@ pekko.persistence.journal.plugin = 
"pekko.persistence.r2dbc.journal"
 It can also be enabled with the `journalPluginId` for a specific 
`EventSourcedBehavior` and multiple
 plugin configurations are supported.
 
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
 
 ### Reference configuration 
 
diff --git a/docs/src/main/paradox/projection.md 
b/docs/src/main/paradox/projection.md
index f01c4cc..a1ca6e3 100644
--- a/docs/src/main/paradox/projection.md
+++ b/docs/src/main/paradox/projection.md
@@ -45,7 +45,7 @@ need to be created in the configured database, see schema 
definition in @ref:[Cr
 ## Configuration
 
 By default, `pekko-projection-r2dbc` uses the same connection pool and 
`dialect` as `pekko-persistence-r2dbc`, see
-@ref:[Connection configuration](connection-config.md).
+@ref:[Connection configuration](config.md#connection-configuration).
 
 ### Reference configuration
 
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index edf095e..e4ba5c9 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -152,5 +152,5 @@ Query configuration is under 
`pekko.persistence.r2dbc.query`. Here's the default
 
 @@snip [reference.conf](/core/src/main/resources/reference.conf) { 
#query-settings }
 
-The query plugin shares the connection pool as the rest of the plugin, see 
@ref:[Connection configuration](connection-config.md).
+The query plugin shares the connection pool as the rest of the plugin, see 
@ref:[Connection configuration](config.md#connection-configuration).
 
diff --git a/docs/src/main/paradox/snapshots.md 
b/docs/src/main/paradox/snapshots.md
index e387a23..f7c1fb3 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -17,7 +17,7 @@ pekko.persistence.snapshot-store.plugin = 
"pekko.persistence.r2dbc.snapshot"
 It can also be enabled with the `snapshotPluginId` for a specific 
`EventSourcedBehavior` and multiple
 plugin configurations are supported.
 
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
 
 ### Reference configuration
 
diff --git a/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala 
b/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala
new file mode 100644
index 0000000..559f54b
--- /dev/null
+++ b/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package docs.home
+
+import org.apache.pekko
+import pekko.actor.typed.scaladsl.ActorContext
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+object RuntimePluginConfigExample {
+
+  type Command
+  val context: ActorContext[_] = ???
+
+  // #runtime-plugin-config
+  def eventSourcedBehaviorForDatabase(database: String) = {
+    val configKey = s"config-for-$database"
+
+    val config: Config =
+      ConfigFactory
+        .load(
+          ConfigFactory
+            .parseString(
+              s"""
+              $configKey = $${pekko.persistence.r2dbc}
+              $configKey = {
+                connection-factory {
+                  database = "$database"
+                }
+
+                journal.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                journal.use-connection-factory = 
"$configKey.connection-factory"
+                snapshot.$configKey.connection-factory = 
$${$configKey.connection-factory}
+                snapshot.use-connection-factory = 
"$configKey.connection-factory"
+              }
+              """
+            )
+        )
+
+    (persistenceId: String) =>
+      EventSourcedBehavior[Command, String, String](
+        PersistenceId.ofUniqueId(persistenceId),
+        emptyState = ???,
+        commandHandler = ???,
+        eventHandler = ???)
+        .withJournalPluginId(s"$configKey.journal")
+        .withJournalPluginConfig(Some(config))
+        .withSnapshotPluginId(s"$configKey.snapshot")
+        .withSnapshotPluginConfig(Some(config))
+  }
+
+  val eventSourcedBehaviorForDatabase1 = 
eventSourcedBehaviorForDatabase("database-1")
+  context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-1"), 
"Actor-1")
+  context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-2"), 
"Actor-2")
+
+  val eventSourcedBehaviorForDatabase2 = 
eventSourcedBehaviorForDatabase("database-2")
+  context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-1"), 
"Actor-3")
+  context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-2"), 
"Actor-4")
+  // #runtime-plugin-config
+}
diff --git 
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
 
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
index 33a9868..055138c 100644
--- 
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
+++ 
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
@@ -14,12 +14,12 @@
 package org.apache.pekko.persistence.r2dbc.migration
 
 import java.time.Instant
+
 import scala.concurrent.Future
 import scala.concurrent.duration._
 import scala.util.Failure
 import scala.util.Success
 import scala.util.Try
-
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.typed.ActorSystem
@@ -32,13 +32,14 @@ import pekko.persistence.SelectedSnapshot
 import pekko.persistence.SnapshotProtocol.LoadSnapshot
 import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
 import pekko.persistence.SnapshotSelectionCriteria
-import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
 import pekko.persistence.query.PersistenceQuery
 import pekko.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
 import pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery
 import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
 import pekko.persistence.r2dbc.journal.JournalDao
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
 import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
@@ -116,16 +117,20 @@ class MigrationTool(system: ActorSystem[_]) {
   private val parallelism = migrationConfig.getInt("parallelism")
 
   private val targetPluginId = 
migrationConfig.getString("target.persistence-plugin-id")
-  private val targetR2dbcSettings = 
R2dbcSettings(system.settings.config.getConfig(targetPluginId))
+  private val targetConfig = system.settings.config.getConfig(targetPluginId)
+  private val targetJournalSettings = 
JournalSettings(targetConfig.getConfig("journal"))
+  private val targetSnapshotettings = 
SnapshotSettings(targetConfig.getConfig("snapshot"))
 
   private val serialization: Serialization = SerializationExtension(system)
 
-  private val targetConnectionFactory = ConnectionFactoryProvider(system)
-    .connectionFactoryFor(targetPluginId + ".connection-factory")
+  private val targetJournalConnectionFactory = 
ConnectionFactoryProvider(system)
+    .connectionFactoryFor(targetJournalSettings.useConnectionFactory)
   private val targetJournalDao =
-    new JournalDao(targetR2dbcSettings, targetConnectionFactory)
+    new JournalDao(targetJournalSettings, targetJournalConnectionFactory)
   private val targetSnapshotDao =
-    new SnapshotDao(targetR2dbcSettings, targetConnectionFactory)
+    new SnapshotDao(targetSnapshotettings,
+      ConnectionFactoryProvider(system)
+        .connectionFactoryFor(targetSnapshotettings.useConnectionFactory))
 
   private val targetBatch = migrationConfig.getInt("target.batch")
 
@@ -138,7 +143,7 @@ class MigrationTool(system: ActorSystem[_]) {
   private lazy val sourceSnapshotStore = 
Persistence(system).snapshotStoreFor(sourceSnapshotPluginId)
 
   private[r2dbc] val migrationDao =
-    new MigrationToolDao(targetConnectionFactory, 
targetR2dbcSettings.logDbCallsExceeding)
+    new MigrationToolDao(targetJournalConnectionFactory, 
targetJournalSettings.logDbCallsExceeding)
 
   private lazy val createProgressTable: Future[Done] =
     migrationDao.createProgressTable()
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index f633fc4..b68c9e7 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -19,6 +19,7 @@ import java.util.UUID
 import scala.concurrent.ExecutionContext
 import scala.concurrent.Future
 import scala.concurrent.duration._
+import org.apache.pekko.persistence.r2dbc.QuerySettings
 import org.apache.pekko
 import pekko.Done
 import pekko.actor.testkit.typed.scaladsl.LogCapturing
@@ -29,7 +30,7 @@ import pekko.actor.typed.Behavior
 import pekko.actor.typed.scaladsl.Behaviors
 import pekko.persistence.query.typed.EventEnvelope
 import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
 import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
 import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
 import pekko.persistence.typed.PersistenceId
@@ -141,7 +142,7 @@ class EventSourcedEndToEndSpec
 
   private val log = LoggerFactory.getLogger(getClass)
 
-  private val journalSettings = new 
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+  private val querySettings = 
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
   private val projectionSettings = R2dbcProjectionSettings(system)
   private val stringSerializer = 
SerializationExtension(system).serializerFor(classOf[String])
 
@@ -154,7 +155,7 @@ class EventSourcedEndToEndSpec
     log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId, 
seqNr: java.lang.Long, event, timestamp)
     implicit val dialect: Dialect = projectionSettings.dialect
     val insertEventSql = sql"""
-      INSERT INTO ${journalSettings.journalTableWithSchema}
+      INSERT INTO ${querySettings.journalTableWithSchema}
       (slice, entity_type, persistence_id, seq_nr, db_timestamp, writer, 
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
       VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)"""
 
@@ -323,13 +324,12 @@ class EventSourcedEndToEndSpec
 
       // pid3, seqNr 8 is missing (knows 7) when receiving 9
       writeEvent(pid3, 9L, startTime.plusMillis(4), "e3-9")
-      
processedProbe.expectNoMessage(journalSettings.querySettings.refreshInterval + 
2000.millis)
+      processedProbe.expectNoMessage(querySettings.refreshInterval + 
2000.millis)
 
       // but backtracking can fill in the gaps, backtracking will pick up pid3 
seqNr 8 and 9
       writeEvent(pid3, 8L, startTime.plusMillis(3), "e3-8")
       val possibleDelay =
-        journalSettings.querySettings.backtrackingBehindCurrentTime + 
journalSettings.querySettings
-          .refreshInterval + processedProbe.remainingOrDefault
+        querySettings.backtrackingBehindCurrentTime + 
querySettings.refreshInterval + processedProbe.remainingOrDefault
       processedProbe.receiveMessage(possibleDelay).envelope.event shouldBe 
"e3-8"
       processedProbe.receiveMessage(possibleDelay).envelope.event shouldBe 
"e3-9"
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index 9c88477..306d2d8 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -43,23 +43,23 @@ import org.slf4j.LoggerFactory
 
 object EventSourcedPubSubSpec {
 
-  val config: Config = ConfigFactory
-    .parseString("""
-    pekko.persistence.r2dbc {
-      journal.publish-events = on
-      query {
+  val config: Config = ConfigFactory.load(
+    ConfigFactory
+      .parseString("""
+      pekko.persistence.r2dbc {
+        journal.publish-events = on
         refresh-interval = 3 seconds
-        # simulate lost messages by overflowing the buffer
-        buffer-size = 10
+          # simulate lost messages by overflowing the buffer
+          buffer-size = 10
 
-        backtracking {
-          behind-current-time = 5 seconds
-          window = 20 seconds
-        }
+          backtracking {
+            behind-current-time = 5 seconds
+            window = 20 seconds
+          }
       }
-    }
-    """)
-    .withFallback(TestConfig.config)
+      """)
+      .withFallback(TestConfig.unresolvedConfig)
+  )
 
   final case class Processed(projectionId: ProjectionId, envelope: 
EventEnvelope[String])
 
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
index fc48aff..aadf9a4 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
@@ -17,7 +17,7 @@ import com.typesafe.config.Config
 import com.typesafe.config.ConfigFactory
 
 object TestConfig {
-  lazy val config: Config = {
+  lazy val unresolvedConfig: Config = {
     val defaultConfig = ConfigFactory.load()
     val dialect = defaultConfig.getString("pekko.projection.r2dbc.dialect")
 
@@ -62,7 +62,7 @@ object TestConfig {
     }
 
     // using load here so that connection-factory can be overridden
-    ConfigFactory.load(dialectConfig.withFallback(ConfigFactory.parseString("""
+    dialectConfig.withFallback(ConfigFactory.parseString("""
     pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
     pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
     pekko.persistence.r2dbc {
@@ -71,6 +71,8 @@ object TestConfig {
       }
     }
     pekko.actor.testkit.typed.default-timeout = 10s
-    """)))
+    """))
   }
+
+  lazy val config: Config = ConfigFactory.load(unresolvedConfig)
 }
diff --git 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
index b794b97..552c5c6 100644
--- 
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
+++ 
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
@@ -15,12 +15,12 @@ package org.apache.pekko.projection.r2dbc
 
 import scala.concurrent.Await
 import scala.concurrent.duration._
-
 import org.apache.pekko
 import pekko.actor.typed.ActorSystem
 import pekko.persistence.Persistence
 import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.StateSettings
 import pekko.persistence.r2dbc.internal.R2dbcExecutor
 import org.scalatest.BeforeAndAfterAll
 import org.scalatest.Suite
@@ -37,7 +37,11 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: 
Suite =>
 
   lazy val r2dbcExecutor: R2dbcExecutor = {
     new R2dbcExecutor(
-      
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
+      // making sure that test harness does not initialize connection factory 
for the plugin that is being tested
+      ConnectionFactoryProvider(typedSystem)
+        .connectionFactoryFor("test.connection-factory",
+          
typedSystem.settings.config.getConfig(r2dbcProjectionSettings.useConnectionFactory).atPath(
+            "test.connection-factory")),
       LoggerFactory.getLogger(getClass),
       
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext, 
typedSystem)
   }
@@ -45,15 +49,17 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this: 
Suite =>
   lazy val persistenceExt: Persistence = Persistence(typedSystem)
 
   override protected def beforeAll(): Unit = {
-    lazy val r2dbcSettings: R2dbcSettings =
-      new 
R2dbcSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc"))
+    lazy val journalSettings: JournalSettings =
+      new 
JournalSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
+    lazy val stateSettings: StateSettings =
+      new 
StateSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.state"))
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(
-        _.createStatement(s"delete from 
${r2dbcSettings.journalTableWithSchema}")),
+        _.createStatement(s"delete from 
${journalSettings.journalTableWithSchema}")),
       10.seconds)
     Await.result(
       r2dbcExecutor.updateOne("beforeAll delete")(
-        _.createStatement(s"delete from 
${r2dbcSettings.durableStateTableWithSchema}")),
+        _.createStatement(s"delete from 
${stateSettings.durableStateTableWithSchema}")),
       10.seconds)
     if (r2dbcProjectionSettings.isOffsetTableDefined) {
       Await.result(


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]


Reply via email to