This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-r2dbc.git
The following commit(s) were added to refs/heads/main by this push:
new 6e7e034 Configuring persistence plugins at runtime (#194)
6e7e034 is described below
commit 6e7e0342ce956a845f16a4b99f04edb311dea625
Author: Domantas Petrauskas <[email protected]>
AuthorDate: Fri Aug 15 15:11:33 2025 +0300
Configuring persistence plugins at runtime (#194)
* Extract events by persistence ID functionality from read journal, make
journal and query plugins depend on it
* Rework plugin implementations to enable runtime plugin configuration
* Add manual lifecycle handling for connection pools created by connection
factory provider
* Simplify plugin settings for tables with schemas
* Remove redundant connection factory provider method
* Fix core tests
* Add runtime plugin config test
* Add query plugin assertions to runtime plugin config test
* Improve structure of runtime plugin config test
* Add state plugin test to runtime plugin config spec
* Clean up state test in runtime plugin config spec
* Make migration tool use journal connection factory settings
* WIP implement improved connection factory provider
* Implement improved connection factory provider
* Fix core tests
* Fix projection tests
* Fix migration tool
* Use separate connection factories for test harnesses
* Import and formatting fixes
* Update license
* Break up shared settings
* Break up shared config
* Remove unnecessary comments
* Add test for v1 config
* Add binary incompatibility filters
* Add documentation
* Fix documentation example compilation
* Fix documentation example compilation for Scala 3
* Fix license headers
* Refactor free specs to word specs
* Remove no longer relevant TODOs/FIXMEs
---
.github/workflows/build-test.yml | 12 +
.../r2dbcsettings.excludes | 20 ++
core/src/main/resources/reference.conf | 127 ++++++--
.../r2dbc/ConnectionFactoryProvider.scala | 42 +--
.../pekko/persistence/r2dbc/R2dbcSettings.scala | 211 +++++++++++---
.../persistence/r2dbc/internal/BySliceQuery.scala | 40 +--
.../r2dbc/internal/EventsByPersistenceIdDao.scala | 151 ++++++++++
.../r2dbc/internal/HighestSequenceNrDao.scala | 73 +++++
.../persistence/r2dbc/journal/JournalDao.scala | 70 ++---
.../persistence/r2dbc/journal/R2dbcJournal.scala | 15 +-
.../r2dbc/journal/mysql/MySQLJournalDao.scala | 15 +-
.../r2dbc/query/scaladsl/QueryDao.scala | 76 ++---
.../r2dbc/query/scaladsl/R2dbcReadJournal.scala | 82 +-----
.../r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala | 8 +-
.../r2dbc/snapshot/R2dbcSnapshotStore.scala | 22 +-
.../persistence/r2dbc/snapshot/SnapshotDao.scala | 21 +-
.../r2dbc/snapshot/mysql/MySQLSnapshotDao.scala | 4 +-
.../r2dbc/state/scaladsl/DurableStateDao.scala | 27 +-
.../state/scaladsl/R2dbcDurableStateStore.scala | 10 +-
.../scaladsl/mysql/MySQLDurableStateDao.scala | 6 +-
.../resources/config-v1.conf} | 0
.../persistence/r2dbc/R2dbcSettingsSpec.scala | 64 ++--
.../pekko/persistence/r2dbc/TestConfig.scala | 19 +-
.../pekko/persistence/r2dbc/TestDbLifecycle.scala | 33 ++-
.../r2dbc/journal/PersistTagsSpec.scala | 7 +-
.../r2dbc/journal/PersistTimestampSpec.scala | 5 +-
.../r2dbc/journal/RuntimePluginConfigSpec.scala | 321 +++++++++++++++++++++
.../r2dbc/query/EventsByPersistenceIdSpec.scala | 2 -
.../query/EventsBySliceBacktrackingSpec.scala | 8 +-
.../r2dbc/query/EventsBySlicePubSubSpec.scala | 13 +-
.../r2dbc/query/EventsBySliceSpec.scala | 2 -
.../r2dbc/state/DurableStateBySliceSpec.scala | 2 -
.../r2dbc/state/DurableStateStoreSpec.scala | 2 +-
.../paradox/{connection-config.md => config.md} | 14 +-
docs/src/main/paradox/durable-state-store.md | 2 +-
docs/src/main/paradox/index.md | 2 +-
docs/src/main/paradox/journal.md | 2 +-
docs/src/main/paradox/projection.md | 2 +-
docs/src/main/paradox/query.md | 2 +-
docs/src/main/paradox/snapshots.md | 2 +-
.../docs/home/RuntimePluginConfigExample.scala | 79 +++++
.../r2dbc/migration/MigrationTool.scala | 23 +-
.../r2dbc/EventSourcedEndToEndSpec.scala | 12 +-
.../projection/r2dbc/EventSourcedPubSubSpec.scala | 28 +-
.../apache/pekko/projection/r2dbc/TestConfig.scala | 8 +-
.../pekko/projection/r2dbc/TestDbLifecycle.scala | 20 +-
46 files changed, 1256 insertions(+), 450 deletions(-)
diff --git a/.github/workflows/build-test.yml b/.github/workflows/build-test.yml
index 02b126a..a809c01 100644
--- a/.github/workflows/build-test.yml
+++ b/.github/workflows/build-test.yml
@@ -82,6 +82,10 @@ jobs:
# TODO: could we poll the port instead of sleep?
sleep 10
docker exec -i docker-postgres-db-1 psql -U postgres -t <
ddl-scripts/create_tables_postgres.sql
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE
DATABASE database1;'
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -d database1
< ddl-scripts/create_tables_postgres.sql
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -c 'CREATE
DATABASE database2;'
+ docker exec -i docker-postgres-db-1 psql -U postgres -t -d database2
< ddl-scripts/create_tables_postgres.sql
- name: test
run: sbt ++${{ matrix.SCALA_VERSION }} test
@@ -128,6 +132,10 @@ jobs:
# TODO: could we poll the port instead of sleep?
sleep 10
docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h
yb-tserver-n1 -t < ddl-scripts/create_tables_yugabyte.sql
+ docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h
yb-tserver-n1 -t -c 'CREATE DATABASE database1;'
+ docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h
yb-tserver-n1 -t -d database1 < ddl-scripts/create_tables_yugabyte.sql
+ docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h
yb-tserver-n1 -t -c 'CREATE DATABASE database2;'
+ docker exec -i yb-tserver-n1 /home/yugabyte/bin/ysqlsh -h
yb-tserver-n1 -t -d database2 < ddl-scripts/create_tables_yugabyte.sql
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=yugabyte
-Dpekko.projection.r2dbc.dialect=yugabyte ++${{ matrix.SCALA_VERSION }} test
@@ -183,6 +191,10 @@ jobs:
run: |-
docker compose -f docker/docker-compose-mysql.yml up -d --wait
docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=mysql < ddl-scripts/create_tables_mysql.sql
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root -e 'CREATE SCHEMA database1;'
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=database1 < ddl-scripts/create_tables_mysql.sql
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root -e 'CREATE SCHEMA database2;'
+ docker exec -i docker-mysql-db-1 mysql -h 127.0.0.1 --user=root
--password=root --database=database2 < ddl-scripts/create_tables_mysql.sql
- name: test
run: sbt -Dpekko.persistence.r2dbc.dialect=mysql ++${{
matrix.SCALA_VERSION }} ${{ matrix.COMPILE_ONLY && 'Test/compile' || 'test' }}
diff --git
a/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes
b/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes
new file mode 100644
index 0000000..e36b41f
--- /dev/null
+++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/r2dbcsettings.excludes
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one
+# or more contributor license agreements. See the NOTICE file
+# distributed with this work for additional information
+# regarding copyright ownership. The ASF licenses this file
+# to you under the Apache License, Version 2.0 (the
+# "License"); you may not use this file except in compliance
+# with the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing,
+# software distributed under the License is distributed on an
+# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+# KIND, either express or implied. See the License for the
+# specific language governing permissions and limitations
+# under the License.
+
+# Caused by https://github.com/apache/pekko-persistence-r2dbc/pull/194
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.persistence.r2dbc.R2dbcSettings")
+ProblemFilters.exclude[MissingClassProblem]("org.apache.pekko.persistence.r2dbc.R2dbcSettings$")
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 1419a80..ba1eec8 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -11,8 +11,6 @@ pekko.persistence.r2dbc {
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"
- # event replay is using pekko.persistence.r2dbc.query.buffer-size
-
# Enable this to reduce latency of eventsBySlices. The persisted events
will be
# published as Pekko messages and consumed directly by running
eventsBySlices
# queries. Tradeoff is more CPU and network resources that are used. The
events
@@ -22,6 +20,20 @@ pekko.persistence.r2dbc {
# replay filter not needed for this plugin
replay-filter.mode = off
+
+ dialect = ${pekko.persistence.r2dbc.dialect}
+
+ schema = ${pekko.persistence.r2dbc.schema}
+
+ use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+ log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+ buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+ db-timestamp-monotonic-increasing =
${pekko.persistence.r2dbc.db-timestamp-monotonic-increasing}
+
+ use-app-timestamp = ${pekko.persistence.r2dbc.use-app-timestamp}
}
}
// #journal-settings
@@ -34,6 +46,14 @@ pekko.persistence.r2dbc {
# Otherwise it would be a pinned dispatcher, see
https://github.com/akka/akka/issues/31058
plugin-dispatcher = "pekko.actor.default-dispatcher"
+
+ dialect = ${pekko.persistence.r2dbc.dialect}
+
+ schema = ${pekko.persistence.r2dbc.schema}
+
+ use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+ log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
}
}
// #snapshot-settings
@@ -50,6 +70,31 @@ pekko.persistence.r2dbc {
# previous revision. There might be a small performance gain if
# this is disabled.
assert-single-writer = on
+
+ dialect = ${pekko.persistence.r2dbc.dialect}
+
+ schema = ${pekko.persistence.r2dbc.schema}
+
+ use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+ log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+ buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+ refresh-interval = ${pekko.persistence.r2dbc.refresh-interval}
+
+ behind-current-time = ${pekko.persistence.r2dbc.behind-current-time}
+ backtracking {
+ enabled = ${pekko.persistence.r2dbc.backtracking.enabled}
+ window = ${pekko.persistence.r2dbc.backtracking.window}
+ behind-current-time =
${pekko.persistence.r2dbc.backtracking.behind-current-time}
+ }
+
+ db-timestamp-monotonic-increasing =
${pekko.persistence.r2dbc.db-timestamp-monotonic-increasing}
+
+ use-app-timestamp = ${pekko.persistence.r2dbc.use-app-timestamp}
+
+ persistence-ids.buffer-size =
${pekko.persistence.r2dbc.persistence-ids.buffer-size}
}
}
// #durable-state-settings
@@ -59,32 +104,9 @@ pekko.persistence.r2dbc {
query {
class = "org.apache.pekko.persistence.r2dbc.query.R2dbcReadJournalProvider"
- # When live queries return no results or <= 10% of buffer-size, the next
query
- # to db will be delayed for this duration.
- # When the number of rows from previous query is >= 90% of buffer-size,
the next
- # query will be emitted immediately.
- # Otherwise, between 10% - 90% of buffer-size, the next query will be
delayed
- # for half of this duration.
- refresh-interval = 3s
-
- # Live queries read events up to this duration from the current database
time.
- behind-current-time = 100 millis
-
- backtracking {
- enabled = on
- # Backtracking queries will look back for this amount of time. It should
- # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
- window = 2 minutes
- # Backtracking queries read events up to this duration from the current
database time.
- behind-current-time = 10 seconds
- }
+ table = ${pekko.persistence.r2dbc.journal.table}
- # In-memory buffer holding events when reading from database.
- buffer-size = 1000
-
- persistence-ids {
- buffer-size = 1000
- }
+ publish-events = ${pekko.persistence.r2dbc.journal.publish-events}
# When journal publish-events is enabled a best effort deduplication can
be enabled by setting
# this property to the size of the deduplication buffer in the
`eventsBySlices` query.
@@ -94,6 +116,26 @@ pekko.persistence.r2dbc {
# the backtracking queries.
deduplicate-capacity = 0
+ dialect = ${pekko.persistence.r2dbc.dialect}
+
+ schema = ${pekko.persistence.r2dbc.schema}
+
+ use-connection-factory = ${pekko.persistence.r2dbc.use-connection-factory}
+
+ log-db-calls-exceeding = ${pekko.persistence.r2dbc.log-db-calls-exceeding}
+
+ buffer-size = ${pekko.persistence.r2dbc.buffer-size}
+
+ refresh-interval = ${pekko.persistence.r2dbc.refresh-interval}
+
+ behind-current-time = ${pekko.persistence.r2dbc.behind-current-time}
+ backtracking {
+ enabled = ${pekko.persistence.r2dbc.backtracking.enabled}
+ window = ${pekko.persistence.r2dbc.backtracking.window}
+ behind-current-time =
${pekko.persistence.r2dbc.backtracking.behind-current-time}
+ }
+
+ persistence-ids.buffer-size =
${pekko.persistence.r2dbc.persistence-ids.buffer-size}
}
}
// #query-settings
@@ -177,6 +219,11 @@ pekko.persistence.r2dbc {
connection-factory-options-customizer = ""
}
+ # Fully qualified config path which holds the connection factory
configuration.
+ # Connection factories are initialized using the config at this path and are
identified by the value of this path.
+ # All persistence plugins use the same value by default, which allows
sharing of single connection factory between all of the plugins.
+ use-connection-factory = "pekko.persistence.r2dbc.connection-factory"
+
# If database timestamp is guaranteed to not move backwards for two
subsequent
# updates of the same persistenceId there might be a performance gain to
# set this to `on`. Note that many databases use the system clock and that
can
@@ -192,5 +239,31 @@ pekko.persistence.r2dbc {
# Set to 0 to log all calls.
log-db-calls-exceeding = 300 ms
+ # In-memory buffer holding events when reading from database.
+ buffer-size = 1000
+
+ # When live queries return no results or <= 10% of buffer-size, the next
query
+ # to db will be delayed for this duration.
+ # When the number of rows from previous query is >= 90% of buffer-size, the
next
+ # query will be emitted immediately.
+ # Otherwise, between 10% - 90% of buffer-size, the next query will be delayed
+ # for half of this duration.
+ refresh-interval = 3s
+
+ # Live queries read events up to this duration from the current database
time.
+ behind-current-time = 100 millis
+
+ backtracking {
+ enabled = on
+ # Backtracking queries will look back for this amount of time. It should
+ # not be larger than the pekko.projection.r2dbc.offset-store.time-window.
+ window = 2 minutes
+ # Backtracking queries read events up to this duration from the current
database time.
+ behind-current-time = 10 seconds
+ }
+
+ persistence-ids {
+ buffer-size = 1000
+ }
}
// #connection-settings
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
index 612e765..b6a5e5d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/ConnectionFactoryProvider.scala
@@ -18,16 +18,8 @@ import java.util.concurrent.ConcurrentHashMap
import scala.concurrent.Future
import scala.concurrent.duration.Duration
-import scala.util.{ Failure, Success }
-import com.typesafe.config.Config
-import io.r2dbc.pool.ConnectionPool
-import io.r2dbc.pool.ConnectionPoolConfiguration
-import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
-import io.r2dbc.postgresql.client.SSLMode
-import io.r2dbc.spi.ConnectionFactories
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.ConnectionFactoryOptions
-import io.r2dbc.spi.Option
+import scala.util.Failure
+import scala.util.Success
import org.apache.pekko
import pekko.Done
import pekko.actor.CoordinatedShutdown
@@ -36,8 +28,18 @@ import pekko.actor.typed.Extension
import pekko.actor.typed.ExtensionId
import
pekko.persistence.r2dbc.ConnectionFactoryProvider.ConnectionFactoryOptionsCustomizer
import pekko.persistence.r2dbc.ConnectionFactoryProvider.NoopCustomizer
-import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.internal.R2dbcExecutor.PublisherOps
import pekko.util.ccompat.JavaConverters._
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import io.r2dbc.pool.ConnectionPool
+import io.r2dbc.pool.ConnectionPoolConfiguration
+import io.r2dbc.postgresql.PostgresqlConnectionFactoryProvider
+import io.r2dbc.postgresql.client.SSLMode
+import io.r2dbc.spi.ConnectionFactories
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.ConnectionFactoryOptions
+import io.r2dbc.spi.Option
object ConnectionFactoryProvider extends
ExtensionId[ConnectionFactoryProvider] {
def createExtension(system: ActorSystem[_]): ConnectionFactoryProvider = new
ConnectionFactoryProvider(system)
@@ -75,7 +77,6 @@ object ConnectionFactoryProvider extends
ExtensionId[ConnectionFactoryProvider]
class ConnectionFactoryProvider(system: ActorSystem[_]) extends Extension {
- import R2dbcExecutor.PublisherOps
private val sessions = new ConcurrentHashMap[String, ConnectionPool]
CoordinatedShutdown(system)
@@ -86,15 +87,20 @@ class ConnectionFactoryProvider(system: ActorSystem[_])
extends Extension {
.map(_ => Done)
}
- def connectionFactoryFor(configLocation: String): ConnectionFactory = {
+ def connectionFactoryFor(configPath: String): ConnectionFactory = {
+ connectionFactoryFor(configPath, ConfigFactory.empty())
+ }
+
+ def connectionFactoryFor(configPath: String, config: Config):
ConnectionFactory = {
sessions
.computeIfAbsent(
- configLocation,
- configLocation => {
- val config = system.settings.config.getConfig(configLocation)
- val settings = new ConnectionFactorySettings(config)
+ configPath,
+ _ => {
+ val fullConfig = config.withFallback(system.settings.config)
+ val settings =
+ new ConnectionFactorySettings(fullConfig.getConfig(configPath))
val customizer = createConnectionFactoryOptionsCustomizer(settings)
- createConnectionPoolFactory(settings, customizer, config)
+ createConnectionPoolFactory(settings, customizer, fullConfig)
})
.asInstanceOf[ConnectionFactory]
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
index 5cbfbff..9a67a80 100644
--- a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
+++ b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettings.scala
@@ -16,107 +16,219 @@ package org.apache.pekko.persistence.r2dbc
import java.util.Locale
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.annotation.InternalApi
import pekko.annotation.InternalStableApi
+import pekko.util.Helpers.toRootLowerCase
import pekko.util.JavaDurationConverters._
import com.typesafe.config.Config
-import pekko.util.Helpers.toRootLowerCase
/**
* INTERNAL API
*/
@InternalStableApi
-object R2dbcSettings {
- def apply(config: Config): R2dbcSettings =
- new R2dbcSettings(config)
+sealed trait Dialect
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object Dialect {
+ case object Postgres extends Dialect
+ case object Yugabyte extends Dialect
+
+ /** @since 1.1.0 */
+ case object MySQL extends Dialect
+
+ /** @since 1.1.0 */
+ def fromString(value: String): Dialect = {
+ toRootLowerCase(value) match {
+ case "yugabyte" => Dialect.Yugabyte
+ case "postgres" => Dialect.Postgres
+ case "mysql" => Dialect.MySQL
+ case other =>
+ throw new IllegalArgumentException(
+ s"Unknown dialect [$other]. Supported dialects are [yugabyte,
postgres, mysql].")
+ }
+ }
}
/**
* INTERNAL API
*/
@InternalStableApi
-final class R2dbcSettings(config: Config) {
- val schema: Option[String] =
Option(config.getString("schema")).filterNot(_.trim.isEmpty)
+final class JournalSettings(val config: Config) extends ConnectionSettings
with UseConnnectionFactory with BufferSize
+ with JournalPublishEvents with DbTimestampMonotonicIncreasing with
UseAppTimestamp {
- val journalTable: String = config.getString("journal.table")
+ val journalTable: String = config.getString("table")
val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
journalTable
+}
- val journalPublishEvents: Boolean =
config.getBoolean("journal.publish-events")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object JournalSettings {
+ def apply(config: Config): JournalSettings =
+ new JournalSettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class SnapshotSettings(val config: Config) extends ConnectionSettings
with UseConnnectionFactory {
- val snapshotsTable: String = config.getString("snapshot.table")
+ val snapshotsTable: String = config.getString("table")
val snapshotsTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
snapshotsTable
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object SnapshotSettings {
+ def apply(config: Config): SnapshotSettings =
+ new SnapshotSettings(config)
+}
- val durableStateTable: String = config.getString("state.table")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class StateSettings(val config: Config) extends ConnectionSettings with
UseConnnectionFactory with BufferSize
+ with RefreshInterval with BySliceQuerySettings with
DbTimestampMonotonicIncreasing with PersistenceIdsQuerySettings
+ with UseAppTimestamp {
+
+ val durableStateTable: String = config.getString("table")
val durableStateTableWithSchema: String = schema.map(_ + ".").getOrElse("")
+ durableStateTable
- val durableStateAssertSingleWriter: Boolean =
config.getBoolean("state.assert-single-writer")
+ val durableStateAssertSingleWriter: Boolean =
config.getBoolean("assert-single-writer")
+}
- val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object StateSettings {
+ def apply(config: Config): StateSettings =
+ new StateSettings(config)
+}
- val querySettings = new QuerySettings(config.getConfig("query"))
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+final class QuerySettings(val config: Config) extends ConnectionSettings with
UseConnnectionFactory with BufferSize
+ with RefreshInterval with BySliceQuerySettings with JournalPublishEvents
with PersistenceIdsQuerySettings {
- val connectionFactorySettings = new
ConnectionFactorySettings(config.getConfig("connection-factory"))
+ val journalTable: String = config.getString("table")
+ val journalTableWithSchema: String = schema.map(_ + ".").getOrElse("") +
journalTable
- val dbTimestampMonotonicIncreasing: Boolean =
config.getBoolean("db-timestamp-monotonic-increasing")
+ val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
+}
- /**
- * INTERNAL API FIXME remove when
https://github.com/yugabyte/yugabyte-db/issues/10995 has been resolved
- */
- @InternalApi private[pekko] val useAppTimestamp: Boolean =
config.getBoolean("use-app-timestamp")
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object QuerySettings {
+ def apply(config: Config): QuerySettings =
+ new QuerySettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait ConnectionSettings {
+ def config: Config
+
+ val dialect: Dialect = Dialect.fromString(config.getString("dialect"))
+
+ val schema: Option[String] =
Option(config.getString("schema")).filterNot(_.trim.isEmpty)
val logDbCallsExceeding: FiniteDuration =
config.getString("log-db-calls-exceeding").toLowerCase(Locale.ROOT) match {
case "off" => -1.millis
case _ => config.getDuration("log-db-calls-exceeding").asScala
}
+}
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait JournalPublishEvents {
+ def config: Config
+
+ val journalPublishEvents: Boolean = config.getBoolean("publish-events")
}
/**
* INTERNAL API
*/
@InternalStableApi
-sealed trait Dialect
+trait DbTimestampMonotonicIncreasing {
+ def config: Config
+
+ val dbTimestampMonotonicIncreasing: Boolean =
config.getBoolean("db-timestamp-monotonic-increasing")
+}
/**
* INTERNAL API
*/
@InternalStableApi
-object Dialect {
- case object Postgres extends Dialect
- case object Yugabyte extends Dialect
+trait UseAppTimestamp {
+ def config: Config
- /** @since 1.1.0 */
- case object MySQL extends Dialect
+ /**
+ * INTERNAL API FIXME remove when
https://github.com/yugabyte/yugabyte-db/issues/10995 has been resolved
+ */
+ @InternalApi private[pekko] val useAppTimestamp: Boolean =
config.getBoolean("use-app-timestamp")
+}
- /** @since 1.1.0 */
- def fromString(value: String): Dialect = {
- toRootLowerCase(value) match {
- case "yugabyte" => Dialect.Yugabyte
- case "postgres" => Dialect.Postgres
- case "mysql" => Dialect.MySQL
- case other =>
- throw new IllegalArgumentException(
- s"Unknown dialect [$other]. Supported dialects are [yugabyte,
postgres, mysql].")
- }
- }
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait BufferSize {
+ def config: Config
+
+ val bufferSize: Int = config.getInt("buffer-size")
}
/**
* INTERNAL API
*/
@InternalStableApi
-final class QuerySettings(config: Config) {
+trait RefreshInterval {
+ def config: Config
+
val refreshInterval: FiniteDuration =
config.getDuration("refresh-interval").asScala
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait BySliceQuerySettings {
+ def config: Config
+
val behindCurrentTime: FiniteDuration =
config.getDuration("behind-current-time").asScala
val backtrackingEnabled: Boolean = config.getBoolean("backtracking.enabled")
val backtrackingWindow: FiniteDuration =
config.getDuration("backtracking.window").asScala
val backtrackingBehindCurrentTime: FiniteDuration =
config.getDuration("backtracking.behind-current-time").asScala
- val bufferSize: Int = config.getInt("buffer-size")
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait PersistenceIdsQuerySettings {
+ def config: Config
+
val persistenceIdsBufferSize: Int =
config.getInt("persistence-ids.buffer-size")
- val deduplicateCapacity: Int = config.getInt("deduplicate-capacity")
}
/**
@@ -156,3 +268,22 @@ final class ConnectionFactorySettings(config: Config) {
val connectionFactoryOptionsCustomizer: Option[String] =
Option(config.getString("connection-factory-options-customizer")).filter(_.trim.nonEmpty)
}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+object ConnectionFactorySettings {
+ def apply(config: Config): ConnectionFactorySettings =
+ new ConnectionFactorySettings(config)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalStableApi
+trait UseConnnectionFactory {
+ def config: Config
+
+ val useConnectionFactory: String = config.getString("use-connection-factory")
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
index cbd2f70..f3bacbc 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/BySliceQuery.scala
@@ -13,22 +13,23 @@
package org.apache.pekko.persistence.r2dbc.internal
-import scala.collection.immutable
import java.time.Instant
import java.time.{ Duration => JDuration }
import scala.annotation.tailrec
+import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-
import org.apache.pekko
import pekko.NotUsed
import pekko.annotation.InternalApi
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.BufferSize
+import pekko.persistence.r2dbc.BySliceQuerySettings
+import pekko.persistence.r2dbc.RefreshInterval
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
import pekko.stream.scaladsl.Flow
import pekko.stream.scaladsl.Source
@@ -98,7 +99,10 @@ import org.slf4j.Logger
* Key is the epoch seconds for the start of the bucket. Value is the
number of entries in the bucket.
*/
class Buckets(countByBucket: immutable.SortedMap[Buckets.EpochSeconds,
Buckets.Count]) {
- import Buckets.{ Bucket, BucketDurationSeconds, Count, EpochSeconds }
+ import Buckets.Bucket
+ import Buckets.BucketDurationSeconds
+ import Buckets.Count
+ import Buckets.EpochSeconds
val createdAt: Instant = Instant.now()
@@ -189,15 +193,15 @@ import org.slf4j.Logger
dao: BySliceQuery.Dao[Row],
createEnvelope: (TimestampOffset, Row) => Envelope,
extractOffset: Envelope => TimestampOffset,
- settings: R2dbcSettings,
+ settings: BySliceQuerySettings with RefreshInterval with BufferSize, //
use `with` to mix the settings classes
log: Logger)(implicit val ec: ExecutionContext) {
import BySliceQuery._
import TimestampOffset.toTimestampOffset
- private val backtrackingWindow =
JDuration.ofMillis(settings.querySettings.backtrackingWindow.toMillis)
+ private val backtrackingWindow =
JDuration.ofMillis(settings.backtrackingWindow.toMillis)
private val halfBacktrackingWindow = backtrackingWindow.dividedBy(2)
private val firstBacktrackingQueryWindow =
-
backtrackingWindow.plus(JDuration.ofMillis(settings.querySettings.backtrackingBehindCurrentTime.toMillis))
+
backtrackingWindow.plus(JDuration.ofMillis(settings.backtrackingBehindCurrentTime.toMillis))
private val eventBucketCountInterval = JDuration.ofSeconds(60)
def currentBySlices(
@@ -218,7 +222,7 @@ import org.slf4j.Logger
if (state.queryCount == 0L || state.rowCount > 0) {
val newState = state.copy(rowCount = 0, queryCount = state.queryCount
+ 1)
- val toTimestamp =
newState.nextQueryToTimestamp(settings.querySettings.bufferSize) match {
+ val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize)
match {
case Some(t) =>
if (t.isBefore(endTimestamp)) t else endTimestamp
case None =>
@@ -323,8 +327,8 @@ import org.slf4j.Logger
} else {
val delay = ContinuousQuery.adjustNextDelay(
state.rowCount,
- settings.querySettings.bufferSize,
- settings.querySettings.refreshInterval)
+ settings.bufferSize,
+ settings.refreshInterval)
if (log.isDebugEnabled)
delay.foreach { d =>
@@ -342,13 +346,13 @@ import org.slf4j.Logger
}
def switchFromBacktracking(state: QueryState): Boolean = {
- state.backtracking && state.rowCount < settings.querySettings.bufferSize
- 1
+ state.backtracking && state.rowCount < settings.bufferSize - 1
}
def nextQuery(state: QueryState): (QueryState, Option[Source[Envelope,
NotUsed]]) = {
val newIdleCount = if (state.rowCount == 0) state.idleCount + 1 else 0
val newState =
- if (settings.querySettings.backtrackingEnabled && !state.backtracking
&& state.latest != TimestampOffset.Zero &&
+ if (settings.backtrackingEnabled && !state.backtracking &&
state.latest != TimestampOffset.Zero &&
(newIdleCount >= 5 || JDuration
.between(state.latestBacktracking.timestamp,
state.latest.timestamp)
.compareTo(halfBacktrackingWindow) > 0)) {
@@ -376,11 +380,11 @@ import org.slf4j.Logger
}
val behindCurrentTime =
- if (newState.backtracking)
settings.querySettings.backtrackingBehindCurrentTime
- else settings.querySettings.behindCurrentTime
+ if (newState.backtracking) settings.backtrackingBehindCurrentTime
+ else settings.behindCurrentTime
val fromTimestamp = newState.nextQueryFromTimestamp
- val toTimestamp =
newState.nextQueryToTimestamp(settings.querySettings.bufferSize)
+ val toTimestamp = newState.nextQueryToTimestamp(settings.bufferSize)
if (log.isDebugEnabled())
log.debug(
@@ -431,7 +435,7 @@ import org.slf4j.Logger
// For Durable State we always refresh the bucket counts at the
interval. For Event Sourced we know
// that they don't change because events are append only.
(dao.countBucketsMayChange || state.buckets
- .findTimeForLimit(state.latest.timestamp,
settings.querySettings.bufferSize)
+ .findTimeForLimit(state.latest.timestamp, settings.bufferSize)
.isEmpty)) {
val fromTimestamp =
@@ -475,9 +479,9 @@ import org.slf4j.Logger
if (row.dbTimestamp == currentTimestamp) {
// has this already been seen?
if (currentSequenceNrs.get(row.persistenceId).exists(_ >=
row.seqNr)) {
- if (currentSequenceNrs.size >= settings.querySettings.bufferSize) {
+ if (currentSequenceNrs.size >= settings.bufferSize) {
throw new IllegalStateException(
- s"Too many events stored with the same timestamp
[$currentTimestamp], buffer size [${settings.querySettings.bufferSize}]")
+ s"Too many events stored with the same timestamp
[$currentTimestamp], buffer size [${settings.bufferSize}]")
}
log.trace(
"filtering [{}] [{}] as db timestamp is the same as last offset
and is in seen [{}]",
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
new file mode 100644
index 0000000..4099091
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/EventsByPersistenceIdDao.scala
@@ -0,0 +1,151 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import java.time.Instant
+
+import org.apache.pekko
+import pekko.NotUsed
+import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.BufferSize
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
+import pekko.persistence.r2dbc.journal.JournalDao.readMetadata
+import pekko.stream.scaladsl.Source
+import org.slf4j.LoggerFactory
+import scala.concurrent.ExecutionContext
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object EventsByPersistenceIdDao {
+
+ private val log = LoggerFactory.getLogger(classOf[EventsByPersistenceIdDao])
+
+ private final case class ByPersistenceIdState(queryCount: Int, rowCount:
Int, latestSeqNr: Long)
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] trait EventsByPersistenceIdDao {
+ import EventsByPersistenceIdDao._
+
+ implicit protected def ec: ExecutionContext
+
+ implicit protected def dialect: Dialect
+ protected def statementTimestampSql: String
+
+ protected def journalTable: String
+
+ protected def r2dbcExecutor: R2dbcExecutor
+
+ protected def settings: BufferSize
+
+ private lazy val selectEventsSql = sql"""
+ SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload
+ from $journalTable
+ WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
+ AND deleted = false
+ ORDER BY seq_nr
+ LIMIT ?"""
+
+ /**
+ * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
+ */
+ @InternalApi private[r2dbc] def internalEventsByPersistenceId(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+ def updateState(state: ByPersistenceIdState, row: SerializedJournalRow):
ByPersistenceIdState =
+ state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
+
+ def nextQuery(
+ state: ByPersistenceIdState,
+ highestSeqNr: Long): (ByPersistenceIdState,
Option[Source[SerializedJournalRow, NotUsed]]) = {
+ if (state.queryCount == 0L || state.rowCount >= settings.bufferSize) {
+ val newState = state.copy(rowCount = 0, queryCount = state.queryCount
+ 1)
+
+ if (state.queryCount != 0 && log.isDebugEnabled())
+ log.debug(
+ "currentEventsByPersistenceId query [{}] for persistenceId [{}],
from [{}] to [{}]. Found [{}] rows in previous query.",
+ state.queryCount: java.lang.Integer,
+ persistenceId,
+ state.latestSeqNr + 1: java.lang.Long,
+ highestSeqNr: java.lang.Long,
+ state.rowCount: java.lang.Integer)
+
+ newState -> Some(eventsByPersistenceId(persistenceId,
state.latestSeqNr + 1, highestSeqNr))
+ } else {
+ log.debug(
+ "currentEventsByPersistenceId query [{}] for persistenceId [{}]
completed. Found [{}] rows in previous query.",
+ state.queryCount: java.lang.Integer,
+ persistenceId,
+ state.rowCount: java.lang.Integer)
+
+ state -> None
+ }
+ }
+
+ if (log.isDebugEnabled())
+ log.debug(
+ "currentEventsByPersistenceId query for persistenceId [{}], from [{}]
to [{}].",
+ persistenceId,
+ fromSequenceNr: java.lang.Long,
+ toSequenceNr: java.lang.Long)
+
+ ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
+ initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr -
1),
+ updateState = updateState,
+ delayNextQuery = _ => None,
+ nextQuery = state => nextQuery(state, toSequenceNr))
+ }
+
+ def eventsByPersistenceId(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
+
+ val result = r2dbcExecutor.select(s"select eventsByPersistenceId
[$persistenceId]")(
+ connection =>
+ connection
+ .createStatement(selectEventsSql)
+ .bind(0, persistenceId)
+ .bind(1, fromSequenceNr)
+ .bind(2, toSequenceNr)
+ .bind(3, settings.bufferSize),
+ row =>
+ SerializedJournalRow(
+ slice = row.get[Integer]("slice", classOf[Integer]),
+ entityType = row.get("entity_type", classOf[String]),
+ persistenceId = row.get("persistence_id", classOf[String]),
+ seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
+ dbTimestamp = row.get("db_timestamp", classOf[Instant]),
+ readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
+ payload = Some(row.get("event_payload", classOf[Array[Byte]])),
+ serId = row.get[Integer]("event_ser_id", classOf[Integer]),
+ serManifest = row.get("event_ser_manifest", classOf[String]),
+ writerUuid = row.get("writer", classOf[String]),
+ tags = Set.empty, // tags not fetched in queries (yet)
+ metadata = readMetadata(row)))
+
+ if (log.isDebugEnabled)
+ result.foreach(rows => log.debug("Read [{}] events for persistenceId
[{}]", rows.size, persistenceId))
+
+ Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
new file mode 100644
index 0000000..b1aa52f
--- /dev/null
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/internal/HighestSequenceNrDao.scala
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2021 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.r2dbc.internal
+
+import org.apache.pekko
+import pekko.annotation.InternalApi
+import pekko.dispatch.ExecutionContexts
+import pekko.persistence.r2dbc.Dialect
+import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
+import org.slf4j.LoggerFactory
+
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+private[r2dbc] object HighestSequenceNrDao {
+
+ private val log = LoggerFactory.getLogger(classOf[HighestSequenceNrDao])
+}
+
+/**
+ * INTERNAL API
+ */
+@InternalApi
+trait HighestSequenceNrDao {
+ import HighestSequenceNrDao._
+
+ implicit protected def ec: ExecutionContext
+
+ implicit protected def dialect: Dialect
+
+ protected def journalTable: String
+
+ protected def r2dbcExecutor: R2dbcExecutor
+
+ private lazy val selectHighestSequenceNrSql = sql"""
+ SELECT MAX(seq_nr) from $journalTable
+ WHERE persistence_id = ? AND seq_nr >= ?"""
+
+ def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long):
Future[Long] = {
+ val result = r2dbcExecutor
+ .select(s"select highest seqNr [$persistenceId]")(
+ connection =>
+ connection
+ .createStatement(selectHighestSequenceNrSql)
+ .bind(0, persistenceId)
+ .bind(1, fromSequenceNr),
+ row => {
+ val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
+ if (seqNr eq null) 0L else seqNr.longValue
+ })
+ .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
+
+ if (log.isDebugEnabled)
+ result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId
[{}]: [{}]", persistenceId, seqNr))
+
+ result
+ }
+}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
index 3c881b6..c06516d 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/JournalDao.scala
@@ -17,9 +17,6 @@ import java.time.Instant
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
-import io.r2dbc.spi.Statement
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
@@ -27,12 +24,18 @@ import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
+import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
+import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
+import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -74,16 +77,16 @@ private[r2dbc] object JournalDao {
}
def fromConfig(
- journalSettings: R2dbcSettings,
- sharedConfigPath: String
+ settings: JournalSettings,
+ config: Config
)(implicit system: ActorSystem[_], ec: ExecutionContext): JournalDao = {
val connectionFactory =
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
- journalSettings.dialect match {
+
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
config)
+ settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
- new JournalDao(journalSettings, connectionFactory)
+ new JournalDao(settings, connectionFactory)
case Dialect.MySQL =>
- new MySQLJournalDao(journalSettings, connectionFactory)
+ new MySQLJournalDao(settings, connectionFactory)
}
}
}
@@ -94,22 +97,21 @@ private[r2dbc] object JournalDao {
* Class for doing db interaction outside of an actor to avoid mistakes in
future callbacks
*/
@InternalApi
-private[r2dbc] class JournalDao(journalSettings: R2dbcSettings,
connectionFactory: ConnectionFactory)(
- implicit
- ec: ExecutionContext,
- system: ActorSystem[_]) {
-
+private[r2dbc] class JournalDao(val settings: JournalSettings,
connectionFactory: ConnectionFactory)(
+ implicit val ec: ExecutionContext, system: ActorSystem[_]) extends
EventsByPersistenceIdDao
+ with HighestSequenceNrDao {
import JournalDao.SerializedJournalRow
import JournalDao.log
- implicit protected val dialect: Dialect = journalSettings.dialect
+ implicit protected val dialect: Dialect = settings.dialect
protected lazy val timestampSql: String = "transaction_timestamp()"
+ protected lazy val statementTimestampSql: String = "statement_timestamp()"
private val persistenceExt = Persistence(system)
- private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
journalSettings.logDbCallsExceeding)(ec, system)
+ protected val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
- protected val journalTable: String = journalSettings.journalTableWithSchema
+ protected val journalTable: String = settings.journalTableWithSchema
protected val (insertEventWithParameterTimestampSql: String,
insertEventWithTransactionTimestampSql: String) = {
val baseSql =
@@ -125,14 +127,14 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
"WHERE persistence_id = ? AND seq_nr = ?)"
val insertEventWithParameterTimestampSql = {
- if (journalSettings.dbTimestampMonotonicIncreasing)
+ if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql ?) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(?, $timestampSubSelect)) RETURNING db_timestamp"
}
val insertEventWithTransactionTimestampSql = {
- if (journalSettings.dbTimestampMonotonicIncreasing)
+ if (settings.dbTimestampMonotonicIncreasing)
sql"$baseSql transaction_timestamp()) RETURNING db_timestamp"
else
sql"$baseSql GREATEST(transaction_timestamp(), $timestampSubSelect))
RETURNING db_timestamp"
@@ -141,10 +143,6 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
(insertEventWithParameterTimestampSql,
insertEventWithTransactionTimestampSql)
}
- private val selectHighestSequenceNrSql = sql"""
- SELECT MAX(seq_nr) from $journalTable
- WHERE persistence_id = ? AND seq_nr >= ?"""
-
private val deleteEventsSql = sql"""
DELETE FROM $journalTable
WHERE persistence_id = ? AND seq_nr <= ?"""
@@ -205,12 +203,12 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
}
if (useTimestampFromDb) {
- if (!journalSettings.dbTimestampMonotonicIncreasing)
+ if (!settings.dbTimestampMonotonicIncreasing)
stmt
.bind(13, write.persistenceId)
.bind(14, previousSeqNr)
} else {
- if (journalSettings.dbTimestampMonotonicIncreasing)
+ if (settings.dbTimestampMonotonicIncreasing)
stmt
.bind(13, write.dbTimestamp)
else
@@ -263,26 +261,6 @@ private[r2dbc] class JournalDao(journalSettings:
R2dbcSettings, connectionFactor
}
}
- def readHighestSequenceNr(persistenceId: String, fromSequenceNr: Long):
Future[Long] = {
- val result = r2dbcExecutor
- .select(s"select highest seqNr [$persistenceId]")(
- connection =>
- connection
- .createStatement(selectHighestSequenceNrSql)
- .bind(0, persistenceId)
- .bind(1, fromSequenceNr),
- row => {
- val seqNr = row.get[java.lang.Long](0, classOf[java.lang.Long])
- if (seqNr eq null) 0L else seqNr.longValue
- })
- .map(r => if (r.isEmpty) 0L else r.head)(ExecutionContexts.parasitic)
-
- if (log.isDebugEnabled)
- result.foreach(seqNr => log.debug("Highest sequence nr for persistenceId
[{}]: [{}]", persistenceId, seqNr))
-
- result
- }
-
def deleteMessagesTo(persistenceId: String, toSequenceNr: Long):
Future[Unit] = {
val entityType = PersistenceId.extractEntityType(persistenceId)
val slice = persistenceExt.sliceForPersistenceId(persistenceId)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
index d141ebd..200d43a 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/R2dbcJournal.scala
@@ -21,7 +21,7 @@ import scala.concurrent.Future
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
+import com.typesafe.config.Config
import org.apache.pekko
import pekko.Done
import pekko.actor.ActorRef
@@ -35,18 +35,15 @@ import pekko.persistence.Persistence
import pekko.persistence.PersistentRepr
import pekko.persistence.journal.AsyncWriteJournal
import pekko.persistence.journal.Tagged
-import pekko.persistence.query.PersistenceQuery
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.internal.PubSub
import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
-import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
import pekko.serialization.Serialization
import pekko.serialization.SerializationExtension
import pekko.serialization.Serializers
import pekko.stream.scaladsl.Sink
-import com.typesafe.config.Config
/**
* INTERNAL API
@@ -92,12 +89,10 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
private val persistenceExt = Persistence(system)
- private val sharedConfigPath = cfgPath.replaceAll("""\.journal$""", "")
private val serialization: Serialization =
SerializationExtension(context.system)
- private val journalSettings =
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
+ private val journalSettings = JournalSettings(config)
- private val journalDao = JournalDao.fromConfig(journalSettings,
sharedConfigPath)
- private val query =
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](sharedConfigPath +
".query")
+ private val journalDao = JournalDao.fromConfig(journalSettings, config)
private val pubSub: Option[PubSub] =
if (journalSettings.journalPublishEvents) Some(PubSub(system))
@@ -226,7 +221,7 @@ private[r2dbc] final class R2dbcJournal(config: Config,
cfgPath: String) extends
val effectiveToSequenceNr =
if (max == Long.MaxValue) toSequenceNr
else math.min(toSequenceNr, fromSequenceNr + max - 1)
- query
+ journalDao
.internalEventsByPersistenceId(persistenceId, fromSequenceNr,
effectiveToSequenceNr)
.runWith(Sink.foreach { row =>
val repr = deserializeRow(serialization, row)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
index 9adba88..e336add 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/journal/mysql/MySQLJournalDao.scala
@@ -20,29 +20,31 @@
package org.apache.pekko.persistence.r2dbc.journal.mysql
import scala.concurrent.ExecutionContext
-import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.DbTimestampMonotonicIncreasing
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.UseAppTimestamp
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
+import io.r2dbc.spi.ConnectionFactory
/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] object MySQLJournalDao {
- def settingRequirements(journalSettings: R2dbcSettings): Unit = {
+ def settingRequirements(settings: UseAppTimestamp with
DbTimestampMonotonicIncreasing): Unit = {
// Application timestamps are used because MySQL does not have
transaction_timestamp like Postgres. In future releases
// they could be tried to be emulated, but the benefits are questionable -
no matter where the timestamps are generated,
// risk of clock skews remains.
- require(journalSettings.useAppTimestamp,
+ require(settings.useAppTimestamp,
"use-app-timestamp config must be on for MySQL support")
// Supporting the non-monotonic increasing timestamps by incrementing the
timestamp within the insert queries based on
// latest row in the database seems to cause deadlocks when running tests
like PersistTimestampSpec. Possibly this could
// be fixed.
- require(journalSettings.dbTimestampMonotonicIncreasing,
+ require(settings.dbTimestampMonotonicIncreasing,
"db-timestamp-monotonic-increasing config must be on for MySQL support")
// Also, missing RETURNING implementation makes grabbing the timestamp
generated by the database less efficient - this
// applies for both of the requirements above.
@@ -54,13 +56,14 @@ private[r2dbc] object MySQLJournalDao {
*/
@InternalApi
private[r2dbc] class MySQLJournalDao(
- journalSettings: R2dbcSettings,
+ journalSettings: JournalSettings,
connectionFactory: ConnectionFactory)(
implicit ec: ExecutionContext, system: ActorSystem[_]
) extends JournalDao(journalSettings, connectionFactory) {
MySQLJournalDao.settingRequirements(journalSettings)
override lazy val timestampSql: String = "NOW(6)"
+ override lazy val statementTimestampSql: String = "NOW(6)"
override val insertEventWithParameterTimestampSql: String =
sql"INSERT INTO $journalTable " +
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
index eebf8bc..20aa370 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/QueryDao.scala
@@ -19,23 +19,26 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
+import pekko.persistence.r2dbc.internal.EventsByPersistenceIdDao
+import pekko.persistence.r2dbc.internal.HighestSequenceNrDao
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.r2dbc.query.scaladsl.mysql.MySQLQueryDao
import pekko.stream.scaladsl.Source
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -43,16 +46,16 @@ object QueryDao {
val log: Logger = LoggerFactory.getLogger(classOf[QueryDao])
def fromConfig(
- journalSettings: R2dbcSettings,
- sharedConfigPath: String
+ settings: QuerySettings,
+ config: Config
)(implicit system: ActorSystem[_], ec: ExecutionContext): QueryDao = {
val connectionFactory =
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
- journalSettings.dialect match {
+
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
config)
+ settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
- new QueryDao(journalSettings, connectionFactory)
+ new QueryDao(settings, connectionFactory)
case Dialect.MySQL =>
- new MySQLQueryDao(journalSettings, connectionFactory)
+ new MySQLQueryDao(settings, connectionFactory)
}
}
}
@@ -61,11 +64,9 @@ object QueryDao {
* INTERNAL API
*/
@InternalApi
-private[r2dbc] class QueryDao(settings: R2dbcSettings, connectionFactory:
ConnectionFactory)(
- implicit
- ec: ExecutionContext,
- system: ActorSystem[_])
- extends BySliceQuery.Dao[SerializedJournalRow] {
+private[r2dbc] class QueryDao(val settings: QuerySettings, connectionFactory:
ConnectionFactory)(
+ implicit val ec: ExecutionContext, system: ActorSystem[_]) extends
BySliceQuery.Dao[SerializedJournalRow]
+ with EventsByPersistenceIdDao with HighestSequenceNrDao {
import JournalDao.readMetadata
import QueryDao.log
@@ -139,21 +140,14 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
FROM $journalTable
WHERE persistence_id = ? AND seq_nr = ? AND deleted = false"""
- private val selectEventsSql = sql"""
- SELECT slice, entity_type, persistence_id, seq_nr, db_timestamp,
$statementTimestampSql AS read_db_timestamp, event_ser_id, event_ser_manifest,
event_payload, writer, adapter_manifest, meta_ser_id, meta_ser_manifest,
meta_payload
- from $journalTable
- WHERE persistence_id = ? AND seq_nr >= ? AND seq_nr <= ?
- AND deleted = false
- ORDER BY seq_nr
- LIMIT ?"""
-
private val allPersistenceIdsSql =
sql"SELECT DISTINCT(persistence_id) from $journalTable ORDER BY
persistence_id LIMIT ?"
private val allPersistenceIdsAfterSql =
sql"SELECT DISTINCT(persistence_id) from $journalTable WHERE
persistence_id > ? ORDER BY persistence_id LIMIT ?"
- private val r2dbcExecutor = new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
+ protected val r2dbcExecutor =
+ new R2dbcExecutor(connectionFactory, log,
settings.logDbCallsExceeding)(ec, system)
def currentDbTimestamp(): Future[Instant] = {
r2dbcExecutor
@@ -189,9 +183,9 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
toTimestamp match {
case Some(until) =>
stmt.bind(2, until)
- stmt.bind(3, settings.querySettings.bufferSize)
+ stmt.bind(3, settings.bufferSize)
case None =>
- stmt.bind(2, settings.querySettings.bufferSize)
+ stmt.bind(2, settings.bufferSize)
}
stmt
},
@@ -311,40 +305,6 @@ private[r2dbc] class QueryDao(settings: R2dbcSettings,
connectionFactory: Connec
tags = Set.empty, // tags not fetched in queries (yet)
metadata = readMetadata(row)))
- def eventsByPersistenceId(
- persistenceId: String,
- fromSequenceNr: Long,
- toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
-
- val result = r2dbcExecutor.select(s"select eventsByPersistenceId
[$persistenceId]")(
- connection =>
- connection
- .createStatement(selectEventsSql)
- .bind(0, persistenceId)
- .bind(1, fromSequenceNr)
- .bind(2, toSequenceNr)
- .bind(3, settings.querySettings.bufferSize),
- row =>
- SerializedJournalRow(
- slice = row.get[Integer]("slice", classOf[Integer]),
- entityType = row.get("entity_type", classOf[String]),
- persistenceId = row.get("persistence_id", classOf[String]),
- seqNr = row.get[java.lang.Long]("seq_nr", classOf[java.lang.Long]),
- dbTimestamp = row.get("db_timestamp", classOf[Instant]),
- readDbTimestamp = row.get("read_db_timestamp", classOf[Instant]),
- payload = Some(row.get("event_payload", classOf[Array[Byte]])),
- serId = row.get[Integer]("event_ser_id", classOf[Integer]),
- serManifest = row.get("event_ser_manifest", classOf[String]),
- writerUuid = row.get("writer", classOf[String]),
- tags = Set.empty, // tags not fetched in queries (yet)
- metadata = readMetadata(row)))
-
- if (log.isDebugEnabled)
- result.foreach(rows => log.debug("Read [{}] events for persistenceId
[{}]", rows.size, persistenceId))
-
- Source.futureSource(result.map(Source(_))).mapMaterializedValue(_ =>
NotUsed)
- }
-
def persistenceIds(afterId: Option[String], limit: Long): Source[String,
NotUsed] = {
val result = r2dbcExecutor.select(s"select persistenceIds")(
connection =>
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
index 4280f11..f721f84 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/R2dbcReadJournal.scala
@@ -37,12 +37,10 @@ import
pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.query.typed.scaladsl.EventsBySliceQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
-import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.ContinuousQuery
import pekko.persistence.r2dbc.internal.PubSub
-import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
import pekko.persistence.typed.PersistenceId
import pekko.serialization.SerializationExtension
@@ -73,16 +71,14 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
import R2dbcReadJournal.PersistenceIdsQueryState
private val log = LoggerFactory.getLogger(getClass)
- private val sharedConfigPath = cfgPath.replaceAll("""\.query$""", "")
- private val settings =
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
+ private val settings = QuerySettings(config)
private implicit val typedSystem: ActorSystem[_] = system.toTyped
import typedSystem.executionContext
private val serialization = SerializationExtension(system)
private val persistenceExt = Persistence(system)
- private val connectionFactory = ConnectionFactoryProvider(typedSystem)
- .connectionFactoryFor(sharedConfigPath + ".connection-factory")
- private val queryDao = QueryDao.fromConfig(settings, sharedConfigPath)
+
+ private val queryDao = QueryDao.fromConfig(settings, config)
private val _bySlice: BySliceQuery[SerializedJournalRow, EventEnvelope[Any]]
= {
val createEnvelope: (TimestampOffset, SerializedJournalRow) =>
EventEnvelope[Any] = (offset, row) => {
@@ -107,8 +103,6 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
private def bySlice[Event]: BySliceQuery[SerializedJournalRow,
EventEnvelope[Event]] =
_bySlice.asInstanceOf[BySliceQuery[SerializedJournalRow,
EventEnvelope[Event]]]
- private val journalDao = JournalDao.fromConfig(settings, sharedConfigPath)
-
def extractEntityTypeFromPersistenceId(persistenceId: String): String =
PersistenceId.extractEntityType(persistenceId)
@@ -169,7 +163,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
.actorRef[EventEnvelope[Event]](
completionMatcher = PartialFunction.empty,
failureMatcher = PartialFunction.empty,
- bufferSize = settings.querySettings.bufferSize,
+ bufferSize = settings.bufferSize,
overflowStrategy = OverflowStrategy.dropNew)
.mapMaterializedValue { ref =>
(minSlice to maxSlice).foreach { slice =>
@@ -177,7 +171,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
pubSub.eventTopic(entityType, slice) !
Topic.Subscribe(ref.toTyped[EventEnvelope[Event]])
}
}
-
dbSource.merge(pubSubSource).via(deduplicate(settings.querySettings.deduplicateCapacity))
+
dbSource.merge(pubSubSource).via(deduplicate(settings.deduplicateCapacity))
} else
dbSource
}
@@ -227,72 +221,19 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
fromSequenceNr: Long,
toSequenceNr: Long): Source[ClassicEventEnvelope, NotUsed] = {
val highestSeqNrFut =
- if (toSequenceNr == Long.MaxValue)
journalDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
+ if (toSequenceNr == Long.MaxValue)
queryDao.readHighestSequenceNr(persistenceId, fromSequenceNr)
else Future.successful(toSequenceNr)
Source
.futureSource[SerializedJournalRow, NotUsed] {
highestSeqNrFut.map { highestSeqNr =>
- internalEventsByPersistenceId(persistenceId, fromSequenceNr,
highestSeqNr)
+ queryDao.internalEventsByPersistenceId(persistenceId,
fromSequenceNr, highestSeqNr)
}
}
.map(deserializeRow)
.mapMaterializedValue(_ => NotUsed)
}
- /**
- * INTERNAL API: Used by both journal replay and currentEventsByPersistenceId
- */
- @InternalApi private[r2dbc] def internalEventsByPersistenceId(
- persistenceId: String,
- fromSequenceNr: Long,
- toSequenceNr: Long): Source[SerializedJournalRow, NotUsed] = {
- def updateState(state: ByPersistenceIdState, row: SerializedJournalRow):
ByPersistenceIdState =
- state.copy(rowCount = state.rowCount + 1, latestSeqNr = row.seqNr)
-
- def nextQuery(
- state: ByPersistenceIdState,
- highestSeqNr: Long): (ByPersistenceIdState,
Option[Source[SerializedJournalRow, NotUsed]]) = {
- if (state.queryCount == 0L || state.rowCount >=
settings.querySettings.bufferSize) {
- val newState = state.copy(rowCount = 0, queryCount = state.queryCount
+ 1)
-
- if (state.queryCount != 0 && log.isDebugEnabled())
- log.debug(
- "currentEventsByPersistenceId query [{}] for persistenceId [{}],
from [{}] to [{}]. Found [{}] rows in previous query.",
- state.queryCount: java.lang.Integer,
- persistenceId,
- state.latestSeqNr + 1: java.lang.Long,
- highestSeqNr: java.lang.Long,
- state.rowCount: java.lang.Integer)
-
- newState -> Some(
- queryDao
- .eventsByPersistenceId(persistenceId, state.latestSeqNr + 1,
highestSeqNr))
- } else {
- log.debug(
- "currentEventsByPersistenceId query [{}] for persistenceId [{}]
completed. Found [{}] rows in previous query.",
- state.queryCount: java.lang.Integer,
- persistenceId,
- state.rowCount: java.lang.Integer)
-
- state -> None
- }
- }
-
- if (log.isDebugEnabled())
- log.debug(
- "currentEventsByPersistenceId query for persistenceId [{}], from [{}]
to [{}].",
- persistenceId,
- fromSequenceNr: java.lang.Long,
- toSequenceNr: java.lang.Long)
-
- ContinuousQuery[ByPersistenceIdState, SerializedJournalRow](
- initialState = ByPersistenceIdState(0, 0, latestSeqNr = fromSequenceNr -
1),
- updateState = updateState,
- delayNextQuery = _ => None,
- nextQuery = state => nextQuery(state, toSequenceNr))
- }
-
// EventTimestampQuery
override def timestampOf(persistenceId: String, sequenceNr: Long):
Future[Option[Instant]] = {
queryDao.timestampOfEvent(persistenceId, sequenceNr)
@@ -323,8 +264,8 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
def delayNextQuery(state: ByPersistenceIdState): Option[FiniteDuration] = {
val delay = ContinuousQuery.adjustNextDelay(
state.rowCount,
- settings.querySettings.bufferSize,
- settings.querySettings.refreshInterval)
+ settings.bufferSize,
+ settings.refreshInterval)
delay.foreach { d =>
log.debug(
@@ -403,7 +344,7 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
queryDao.persistenceIds(afterId, limit)
override def currentPersistenceIds(): Source[String, NotUsed] = {
- import settings.querySettings.persistenceIdsBufferSize
+ import settings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String):
PersistenceIdsQueryState =
state.copy(rowCount = state.rowCount + 1, latestPid = pid)
@@ -439,5 +380,4 @@ final class R2dbcReadJournal(system: ExtendedActorSystem,
config: Config, cfgPat
nextQuery = state => nextQuery(state))
.mapMaterializedValue(_ => NotUsed)
}
-
}
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
index 8cbc5b6..dc04dda 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/query/scaladsl/mysql/MySQLQueryDao.scala
@@ -25,22 +25,22 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.QueryDao
+import io.r2dbc.spi.ConnectionFactory
/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class MySQLQueryDao(
- journalSettings: R2dbcSettings,
+ querySettings: QuerySettings,
connectionFactory: ConnectionFactory
-)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
QueryDao(journalSettings, connectionFactory) {
+)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
QueryDao(querySettings, connectionFactory) {
override lazy val statementTimestampSql: String = "NOW(6)"
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
index 7d637ad..a030edc 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/R2dbcSnapshotStore.scala
@@ -13,20 +13,23 @@
package org.apache.pekko.persistence.r2dbc.snapshot
+import scala.concurrent.ExecutionContext
+import scala.concurrent.Future
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.actor.typed.scaladsl.adapter._
-import pekko.persistence.{ SelectedSnapshot, SnapshotMetadata,
SnapshotSelectionCriteria }
-import pekko.persistence.r2dbc.R2dbcSettings
-import pekko.persistence.snapshot.SnapshotStore
-import pekko.serialization.{ Serialization, SerializationExtension }
-import com.typesafe.config.Config
-import scala.concurrent.{ ExecutionContext, Future }
-
import pekko.annotation.InternalApi
+import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotMetadata
import pekko.persistence.r2dbc.snapshot.SnapshotDao.SerializedSnapshotRow
+import pekko.persistence.snapshot.SnapshotStore
+import pekko.persistence.SelectedSnapshot
+import pekko.persistence.SnapshotMetadata
+import pekko.persistence.SnapshotSelectionCriteria
import pekko.serialization.Serializers
+import pekko.serialization.Serialization
+import pekko.serialization.SerializationExtension
+import com.typesafe.config.Config
object R2dbcSnapshotStore {
private def deserializeSnapshotRow(snap: SerializedSnapshotRow,
serialization: Serialization): SelectedSnapshot =
@@ -57,9 +60,8 @@ private[r2dbc] final class R2dbcSnapshotStore(cfg: Config,
cfgPath: String) exte
private implicit val system: ActorSystem[_] = context.system.toTyped
private val dao = {
- val sharedConfigPath = cfgPath.replaceAll("""\.snapshot$""", "")
- val settings =
R2dbcSettings(context.system.settings.config.getConfig(sharedConfigPath))
- SnapshotDao.fromConfig(settings, sharedConfigPath)
+ val settings = SnapshotSettings(cfg)
+ SnapshotDao.fromConfig(settings, cfg)
}
def loadAsync(persistenceId: String, criteria: SnapshotSelectionCriteria):
Future[Option[SelectedSnapshot]] =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
index 01f3395..bc3bb94 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/SnapshotDao.scala
@@ -15,8 +15,6 @@ package org.apache.pekko.persistence.r2dbc.snapshot
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.Row
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
@@ -25,11 +23,14 @@ import pekko.persistence.Persistence
import pekko.persistence.SnapshotSelectionCriteria
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.snapshot.mysql.MySQLSnapshotDao
import pekko.persistence.typed.PersistenceId
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.Row
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -69,16 +70,16 @@ private[r2dbc] object SnapshotDao {
})
def fromConfig(
- journalSettings: R2dbcSettings,
- sharedConfigPath: String
+ settings: SnapshotSettings,
+ config: Config
)(implicit system: ActorSystem[_], ec: ExecutionContext): SnapshotDao = {
val connectionFactory =
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
- journalSettings.dialect match {
+
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
config)
+ settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
- new SnapshotDao(journalSettings, connectionFactory)
+ new SnapshotDao(settings, connectionFactory)
case Dialect.MySQL =>
- new MySQLSnapshotDao(journalSettings, connectionFactory)
+ new MySQLSnapshotDao(settings, connectionFactory)
}
}
}
@@ -89,7 +90,7 @@ private[r2dbc] object SnapshotDao {
* Class for doing db interaction outside of an actor to avoid mistakes in
future callbacks
*/
@InternalApi
-private[r2dbc] class SnapshotDao(settings: R2dbcSettings, connectionFactory:
ConnectionFactory)(
+private[r2dbc] class SnapshotDao(settings: SnapshotSettings,
connectionFactory: ConnectionFactory)(
implicit
ec: ExecutionContext,
system: ActorSystem[_]) {
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
index e725168..6baa0cf 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/snapshot/mysql/MySQLSnapshotDao.scala
@@ -24,7 +24,7 @@ import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.snapshot.SnapshotDao
@@ -33,7 +33,7 @@ import pekko.persistence.r2dbc.snapshot.SnapshotDao
*/
@InternalApi
private[r2dbc] class MySQLSnapshotDao(
- settings: R2dbcSettings, connectionFactory: ConnectionFactory
+ settings: SnapshotSettings, connectionFactory: ConnectionFactory
)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
SnapshotDao(settings, connectionFactory) {
override val upsertSql = sql"""
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
index 7c9d2c1..2c5d454 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/DurableStateDao.scala
@@ -19,9 +19,6 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
-import io.r2dbc.spi.R2dbcDataIntegrityViolationException
-import io.r2dbc.spi.Statement
import org.apache.pekko
import pekko.Done
import pekko.NotUsed
@@ -31,7 +28,7 @@ import pekko.dispatch.ExecutionContexts
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets
import pekko.persistence.r2dbc.internal.BySliceQuery.Buckets.Bucket
@@ -40,6 +37,10 @@ import
pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.state.scaladsl.mysql.MySQLDurableStateDao
import pekko.persistence.typed.PersistenceId
import pekko.stream.scaladsl.Source
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
+import io.r2dbc.spi.R2dbcDataIntegrityViolationException
+import io.r2dbc.spi.Statement
import org.slf4j.Logger
import org.slf4j.LoggerFactory
@@ -64,16 +65,16 @@ import org.slf4j.LoggerFactory
}
def fromConfig(
- journalSettings: R2dbcSettings,
- sharedConfigPath: String
+ settings: StateSettings,
+ config: Config
)(implicit system: ActorSystem[_], ec: ExecutionContext): DurableStateDao = {
val connectionFactory =
- ConnectionFactoryProvider(system).connectionFactoryFor(sharedConfigPath
+ ".connection-factory")
- journalSettings.dialect match {
+
ConnectionFactoryProvider(system).connectionFactoryFor(settings.useConnectionFactory,
config)
+ settings.dialect match {
case Dialect.Postgres | Dialect.Yugabyte =>
- new DurableStateDao(journalSettings, connectionFactory)
+ new DurableStateDao(settings, connectionFactory)
case Dialect.MySQL =>
- new MySQLDurableStateDao(journalSettings, connectionFactory)
+ new MySQLDurableStateDao(settings, connectionFactory)
}
}
}
@@ -84,7 +85,7 @@ import org.slf4j.LoggerFactory
* Class for encapsulating db interaction.
*/
@InternalApi
-private[r2dbc] class DurableStateDao(settings: R2dbcSettings,
connectionFactory: ConnectionFactory)(
+private[r2dbc] class DurableStateDao(settings: StateSettings,
connectionFactory: ConnectionFactory)(
implicit
ec: ExecutionContext,
system: ActorSystem[_])
@@ -360,9 +361,9 @@ private[r2dbc] class DurableStateDao(settings:
R2dbcSettings, connectionFactory:
toTimestamp match {
case Some(until) =>
stmt.bind(2, until)
- stmt.bind(3, settings.querySettings.bufferSize)
+ stmt.bind(3, settings.bufferSize)
case None =>
- stmt.bind(2, settings.querySettings.bufferSize)
+ stmt.bind(2, settings.bufferSize)
}
stmt
},
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
index 4d7de65..ab30892 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/R2dbcDurableStateStore.scala
@@ -31,7 +31,7 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
import
pekko.persistence.query.scaladsl.DurableStateStorePagedPersistenceIdsQuery
import pekko.persistence.query.typed.scaladsl.DurableStateStoreBySliceQuery
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.BySliceQuery
import pekko.persistence.r2dbc.internal.ContinuousQuery
import
pekko.persistence.r2dbc.state.scaladsl.DurableStateDao.SerializedStateRow
@@ -55,14 +55,14 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
import R2dbcDurableStateStore.PersistenceIdsQueryState
private val log = LoggerFactory.getLogger(getClass)
- private val sharedConfigPath = cfgPath.replaceAll("""\.state$""", "")
- private val settings =
R2dbcSettings(system.settings.config.getConfig(sharedConfigPath))
+ private val settings = StateSettings(config)
private implicit val typedSystem: ActorSystem[_] = system.toTyped
implicit val ec: ExecutionContext = system.dispatcher
private val serialization = SerializationExtension(system)
private val persistenceExt = Persistence(system)
- private val stateDao = DurableStateDao.fromConfig(settings, sharedConfigPath)
+
+ private val stateDao = DurableStateDao.fromConfig(settings, config)
private val bySlice: BySliceQuery[SerializedStateRow, DurableStateChange[A]]
= {
val createEnvelope: (TimestampOffset, SerializedStateRow) =>
DurableStateChange[A] = (offset, row) => {
@@ -152,7 +152,7 @@ class R2dbcDurableStateStore[A](system:
ExtendedActorSystem, config: Config, cfg
stateDao.persistenceIds(afterId, limit)
def currentPersistenceIds(): Source[String, NotUsed] = {
- import settings.querySettings.persistenceIdsBufferSize
+ import settings.persistenceIdsBufferSize
def updateState(state: PersistenceIdsQueryState, pid: String):
PersistenceIdsQueryState =
state.copy(rowCount = state.rowCount + 1, latestPid = pid)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
index 385dc21..bcd7056 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/r2dbc/state/scaladsl/mysql/MySQLDurableStateDao.scala
@@ -25,21 +25,21 @@ import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration.Duration
import scala.concurrent.duration.FiniteDuration
-import io.r2dbc.spi.ConnectionFactory
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.annotation.InternalApi
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.journal.mysql.MySQLJournalDao
import pekko.persistence.r2dbc.state.scaladsl.DurableStateDao
+import io.r2dbc.spi.ConnectionFactory
/**
* INTERNAL API
*/
@InternalApi
private[r2dbc] class MySQLDurableStateDao(
- settings: R2dbcSettings,
+ settings: StateSettings,
connectionFactory: ConnectionFactory
)(implicit ec: ExecutionContext, system: ActorSystem[_]) extends
DurableStateDao(settings, connectionFactory) {
MySQLJournalDao.settingRequirements(settings)
diff --git a/core/src/main/resources/reference.conf
b/core/src/test/resources/config-v1.conf
similarity index 100%
copy from core/src/main/resources/reference.conf
copy to core/src/test/resources/config-v1.conf
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
index 324818e..c3257e0 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/R2dbcSettingsSpec.scala
@@ -13,25 +13,33 @@
package org.apache.pekko.persistence.r2dbc
+import com.typesafe.config.ConfigException
import com.typesafe.config.ConfigFactory
import io.r2dbc.postgresql.client.SSLMode
+import org.scalatest.Inspectors._
import org.scalatest.TestSuite
import org.scalatest.matchers.should.Matchers
import org.scalatest.wordspec.AnyWordSpec
class R2dbcSettingsSpec extends AnyWordSpec with TestSuite with Matchers {
+ private lazy val configV1 = ConfigFactory.parseResources("config-v1.conf")
+
"Settings" should {
"have table names with schema" in {
- val config =
ConfigFactory.parseString("pekko.persistence.r2dbc.schema=s1").withFallback(ConfigFactory.load())
- val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
- settings.journalTableWithSchema shouldBe "s1.event_journal"
- settings.snapshotsTableWithSchema shouldBe "s1.snapshot"
- settings.durableStateTableWithSchema shouldBe "s1.durable_state"
+ val config =
ConfigFactory.load(ConfigFactory.parseString("pekko.persistence.r2dbc.schema=s1"))
+ val journalSettings =
JournalSettings(config.getConfig("pekko.persistence.r2dbc.journal"))
+ journalSettings.journalTableWithSchema shouldBe "s1.event_journal"
+ val snapshotSettings =
SnapshotSettings(config.getConfig("pekko.persistence.r2dbc.snapshot"))
+ snapshotSettings.snapshotsTableWithSchema shouldBe "s1.snapshot"
+ val stateSettings =
StateSettings(config.getConfig("pekko.persistence.r2dbc.state"))
+ stateSettings.durableStateTableWithSchema shouldBe "s1.durable_state"
// by default connection is configured with options
- settings.connectionFactorySettings shouldBe a[ConnectionFactorySettings]
- settings.connectionFactorySettings.urlOption should not be defined
+ val connectionFactorySettings =
+
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+ connectionFactorySettings shouldBe a[ConnectionFactorySettings]
+ connectionFactorySettings.urlOption should not be defined
}
"support connection settings build from url" in {
@@ -40,35 +48,55 @@ class R2dbcSettingsSpec extends AnyWordSpec with TestSuite
with Matchers {
.parseString("pekko.persistence.r2dbc.connection-factory.url=whatever-url")
.withFallback(ConfigFactory.load())
- val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
- settings.connectionFactorySettings shouldBe a[ConnectionFactorySettings]
- settings.connectionFactorySettings.urlOption shouldBe defined
+ val settings =
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+ settings shouldBe a[ConnectionFactorySettings]
+ settings.urlOption shouldBe defined
}
"support ssl-mode as enum name" in {
val config = ConfigFactory
.parseString("pekko.persistence.r2dbc.connection-factory.ssl.mode=VERIFY_FULL")
.withFallback(ConfigFactory.load())
- val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
- settings.connectionFactorySettings.sslMode shouldBe "VERIFY_FULL"
- SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe
SSLMode.VERIFY_FULL
+ val settings =
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+ settings.sslMode shouldBe "VERIFY_FULL"
+ SSLMode.fromValue(settings.sslMode) shouldBe SSLMode.VERIFY_FULL
}
"support ssl-mode values in lower and dashes" in {
val config = ConfigFactory
.parseString("pekko.persistence.r2dbc.connection-factory.ssl.mode=verify-full")
.withFallback(ConfigFactory.load())
- val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
- settings.connectionFactorySettings.sslMode shouldBe "verify-full"
- SSLMode.fromValue(settings.connectionFactorySettings.sslMode) shouldBe
SSLMode.VERIFY_FULL
+ val settings =
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+ settings.sslMode shouldBe "verify-full"
+ SSLMode.fromValue(settings.sslMode) shouldBe SSLMode.VERIFY_FULL
}
"allow to specify ConnectionFactoryOptions customizer" in {
val config = ConfigFactory
.parseString("pekko.persistence.r2dbc.connection-factory.connection-factory-options-customizer=fqcn")
.withFallback(ConfigFactory.load())
- val settings = R2dbcSettings(config.getConfig("pekko.persistence.r2dbc"))
- settings.connectionFactorySettings.connectionFactoryOptionsCustomizer
shouldBe Some("fqcn")
+ val settings =
ConnectionFactorySettings(config.getConfig("pekko.persistence.r2dbc.connection-factory"))
+ settings.connectionFactoryOptionsCustomizer shouldBe Some("fqcn")
+ }
+
+ "not work when not merged with reference config for plugin config with v1
config" in {
+
assertThrows[ConfigException.Missing](JournalSettings(configV1.getConfig("pekko.persistence.r2dbc.journal")))
+
assertThrows[ConfigException.Missing](SnapshotSettings(configV1.getConfig("pekko.persistence.r2dbc.snapshot")))
+
assertThrows[ConfigException.Missing](StateSettings(configV1.getConfig("pekko.persistence.r2dbc.state")))
+ }
+
+ "work when merged with reference config for plugin config with v1 config"
in {
+ val configWithReference = ConfigFactory.load(configV1)
+
+ forEvery(
+ List(
+
JournalSettings(configWithReference.getConfig("pekko.persistence.r2dbc.journal")),
+
SnapshotSettings(configWithReference.getConfig("pekko.persistence.r2dbc.snapshot")),
+
StateSettings(configWithReference.getConfig("pekko.persistence.r2dbc.state"))
+ )
+ ) { settings =>
+ settings.useConnectionFactory shouldBe
"pekko.persistence.r2dbc.connection-factory"
+ }
}
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
index 161262e..08789e5 100644
--- a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
+++ b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestConfig.scala
@@ -17,7 +17,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
object TestConfig {
- lazy val config: Config = {
+ lazy val unresolvedConfig: Config = {
val defaultConfig = ConfigFactory.load()
val dialect = defaultConfig.getString("pekko.persistence.r2dbc.dialect")
@@ -46,7 +46,7 @@ object TestConfig {
""")
case "mysql" =>
ConfigFactory.parseString("""
- pekko.persistence.r2dbc{
+ pekko.persistence.r2dbc {
connection-factory {
driver = "mysql"
host = "localhost"
@@ -61,26 +61,23 @@ object TestConfig {
""")
}
- // using load here so that connection-factory can be overridden
- ConfigFactory.load(dialectConfig.withFallback(ConfigFactory.parseString("""
+ dialectConfig.withFallback(ConfigFactory.parseString("""
pekko.loglevel = DEBUG
pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
pekko.persistence.snapshot-store.plugin =
"pekko.persistence.r2dbc.snapshot"
pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
- pekko.persistence.r2dbc {
- query {
- refresh-interval = 1s
- }
- }
+ pekko.persistence.r2dbc.refresh-interval = 1s
pekko.actor {
serialization-bindings {
"org.apache.pekko.persistence.r2dbc.CborSerializable" = jackson-cbor
}
}
pekko.actor.testkit.typed.default-timeout = 10s
- """)))
+ """))
}
+ lazy val config: Config = ConfigFactory.load(unresolvedConfig)
+
val backtrackingDisabledConfig: Config =
-
ConfigFactory.parseString("pekko.persistence.r2dbc.query.backtracking.enabled =
off")
+ ConfigFactory.parseString("pekko.persistence.r2dbc.backtracking.enabled =
off")
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
index 3055261..5adfee8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/TestDbLifecycle.scala
@@ -15,11 +15,12 @@ package org.apache.pekko.persistence.r2dbc
import scala.concurrent.Await
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import com.typesafe.config.Config
+import io.r2dbc.spi.ConnectionFactory
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
import org.slf4j.LoggerFactory
@@ -30,32 +31,42 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this:
Suite =>
def testConfigPath: String = "pekko.persistence.r2dbc"
- lazy val r2dbcSettings: R2dbcSettings =
- new R2dbcSettings(typedSystem.settings.config.getConfig(testConfigPath))
+ private lazy val config: Config = typedSystem.settings.config
+
+ lazy val journalSettings: JournalSettings = new
JournalSettings(config.getConfig(testConfigPath + ".journal"))
+
+ lazy val snapshotSettings: SnapshotSettings = new
SnapshotSettings(config.getConfig(testConfigPath + ".snapshot"))
+
+ lazy val stateSettings: StateSettings = new
StateSettings(config.getConfig(testConfigPath + ".state"))
- lazy val r2dbcExecutor: R2dbcExecutor = {
+ // making sure that test harness does not initialize connection factory for
the plugin that is being tested
+ lazy val connectionFactoryProvider: ConnectionFactory =
+ ConnectionFactoryProvider(typedSystem)
+ .connectionFactoryFor("test.connection-factory",
+
config.getConfig("pekko.persistence.r2dbc.connection-factory").atPath("test.connection-factory"))
+
+ // this assumes that journal, snapshot store and state use same connection
settings
+ lazy val r2dbcExecutor: R2dbcExecutor =
new R2dbcExecutor(
-
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(testConfigPath +
".connection-factory"),
+ connectionFactoryProvider,
LoggerFactory.getLogger(getClass),
- r2dbcSettings.logDbCallsExceeding)(typedSystem.executionContext,
typedSystem)
- }
+ journalSettings.logDbCallsExceeding)(typedSystem.executionContext,
typedSystem)
lazy val persistenceExt: Persistence = Persistence(typedSystem)
override protected def beforeAll(): Unit = {
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
- _.createStatement(s"delete from
${r2dbcSettings.journalTableWithSchema}")),
+ _.createStatement(s"delete from
${journalSettings.journalTableWithSchema}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
- _.createStatement(s"delete from
${r2dbcSettings.snapshotsTableWithSchema}")),
+ _.createStatement(s"delete from
${snapshotSettings.snapshotsTableWithSchema}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
- _.createStatement(s"delete from
${r2dbcSettings.durableStateTableWithSchema}")),
+ _.createStatement(s"delete from
${stateSettings.durableStateTableWithSchema}")),
10.seconds)
super.beforeAll()
}
-
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
index d0c6fcc..20427d2 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTagsSpec.scala
@@ -14,13 +14,12 @@
package org.apache.pekko.persistence.r2dbc.journal
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
@@ -36,11 +35,11 @@ class PersistTagsSpec
with LogCapturing {
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+ private val settings =
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
case class Row(pid: String, seqNr: Long, tags: Set[String])
- private lazy val dialect =
system.settings.config.getString("pekko.persistence.r2dbc.dialect")
+ private lazy val dialect =
system.settings.config.getString("pekko.persistence.r2dbc.journal.dialect")
private lazy val testEnabled: Boolean = {
// tags are not implemented for MySQL
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
index b021abb..5b29f6d 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/PersistTimestampSpec.scala
@@ -16,13 +16,12 @@ package org.apache.pekko.persistence.r2dbc.journal
import java.time.Instant
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
import pekko.actor.typed.ActorSystem
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
@@ -39,7 +38,7 @@ class PersistTimestampSpec
with LogCapturing {
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+ private val settings =
JournalSettings(system.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
private val serialization = SerializationExtension(system)
case class Row(pid: String, seqNr: Long, dbTimestamp: Instant, event: String)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
new file mode 100644
index 0000000..ebef1eb
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/journal/RuntimePluginConfigSpec.scala
@@ -0,0 +1,321 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.r2dbc.journal
+
+import scala.collection.immutable.ListSet
+import scala.concurrent.Await
+import scala.concurrent.duration._
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+import org.apache.pekko
+import pekko.Done
+import pekko.actor.ExtendedActorSystem
+import pekko.actor.testkit.typed.scaladsl.LogCapturing
+import pekko.actor.testkit.typed.scaladsl.ScalaTestWithActorTestKit
+import pekko.actor.typed.ActorRef
+import pekko.actor.typed.ActorSystem
+import pekko.actor.typed.Behavior
+import pekko.actor.typed.scaladsl.adapter._
+import pekko.persistence.JournalProtocol.RecoverySuccess
+import pekko.persistence.JournalProtocol.ReplayMessages
+import pekko.persistence.JournalProtocol.ReplayedMessage
+import pekko.persistence.Persistence
+import pekko.persistence.SelectedSnapshot
+import pekko.persistence.SnapshotProtocol.LoadSnapshot
+import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
+import pekko.persistence.SnapshotSelectionCriteria
+import pekko.persistence.query.PersistenceQuery
+import pekko.persistence.r2dbc.ConnectionFactoryProvider
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
+import pekko.persistence.r2dbc.StateSettings
+import pekko.persistence.r2dbc.TestConfig
+import pekko.persistence.r2dbc.internal.R2dbcExecutor
+import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
+import pekko.persistence.r2dbc.state.scaladsl.R2dbcDurableStateStore
+import pekko.persistence.state.scaladsl.GetObjectResult
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.Effect
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import pekko.persistence.typed.scaladsl.RetentionCriteria
+import pekko.stream.scaladsl.Sink
+import org.scalatest.BeforeAndAfterEach
+import org.scalatest.Inside
+import org.scalatest.wordspec.AnyWordSpecLike
+import org.slf4j.LoggerFactory
+
+object RuntimePluginConfigSpec {
+
+ trait EventSourced {
+ import EventSourced._
+
+ def configKey: String
+ def database: String
+
+ lazy val config: Config =
+ ConfigFactory
+ .load(
+ ConfigFactory
+ .parseString(
+ s"""
+ $configKey = $${pekko.persistence.r2dbc}
+ $configKey = {
+ connection-factory {
+ database = "$database"
+ }
+
+ journal.$configKey.connection-factory =
$${$configKey.connection-factory}
+ journal.use-connection-factory =
"$configKey.connection-factory"
+ query.$configKey.connection-factory =
$${$configKey.connection-factory}
+ query.use-connection-factory = "$configKey.connection-factory"
+ snapshot.$configKey.connection-factory =
$${$configKey.connection-factory}
+ snapshot.use-connection-factory =
"$configKey.connection-factory"
+ }
+ """
+ )
+ .withFallback(TestConfig.unresolvedConfig)
+ )
+
+ def apply(persistenceId: String): Behavior[Command] =
+ EventSourcedBehavior[Command, String, String](
+ PersistenceId.ofUniqueId(persistenceId),
+ "",
+ (state, cmd) =>
+ cmd match {
+ case Save(text, replyTo) =>
+ Effect.persist(text).thenRun(_ => replyTo ! Done)
+ case ShowMeWhatYouGot(replyTo) =>
+ replyTo ! state
+ Effect.none
+ case Stop =>
+ Effect.stop()
+ },
+ (state, evt) => Seq(state, evt).filter(_.nonEmpty).mkString("|"))
+ .withRetention(RetentionCriteria.snapshotEvery(1, Int.MaxValue))
+ .withJournalPluginId(s"$configKey.journal")
+ .withJournalPluginConfig(Some(config))
+ .withSnapshotPluginId(s"$configKey.snapshot")
+ .withSnapshotPluginConfig(Some(config))
+ }
+ object EventSourced {
+ sealed trait Command
+ case class Save(text: String, replyTo: ActorRef[Done]) extends Command
+ case class ShowMeWhatYouGot(replyTo: ActorRef[String]) extends Command
+ case object Stop extends Command
+ }
+
+ trait DurableState {
+ def typedSystem: ActorSystem[_]
+ def configKey: String
+ def database: String
+
+ lazy val config: Config =
+ ConfigFactory
+ .load(
+ ConfigFactory
+ .parseString(
+ s"""
+ $configKey = $${pekko.persistence.r2dbc}
+ $configKey = {
+ connection-factory {
+ database = "$database"
+ }
+
+ state.$configKey.connection-factory =
$${$configKey.connection-factory}
+ state.use-connection-factory = "$configKey.connection-factory"
+ }
+ """
+ )
+ .withFallback(TestConfig.unresolvedConfig)
+ )
+
+ val store = new R2dbcDurableStateStore[String](
+ typedSystem.toClassic.asInstanceOf[ExtendedActorSystem],
+ config.getConfig(s"$configKey.state"),
+ ""
+ )
+ }
+}
+
+class RuntimePluginConfigSpec
+ extends ScalaTestWithActorTestKit(TestConfig.config)
+ with AnyWordSpecLike
+ with BeforeAndAfterEach
+ with LogCapturing
+ with Inside {
+ import RuntimePluginConfigSpec._
+
+ private lazy val eventSourced1 = new EventSourced {
+ override def configKey: String = "plugin1"
+ override def database: String = "database1"
+ }
+ private lazy val eventSourced2 = new EventSourced {
+ override def configKey: String = "plugin2"
+ override def database: String = "database2"
+ }
+
+ private lazy val state1 = new DurableState {
+ override def typedSystem: ActorSystem[_] = system
+ override def configKey: String = "plugin1"
+ override def database: String = "database1"
+ }
+ private lazy val state2 = new DurableState {
+ override def typedSystem: ActorSystem[_] = system
+ override def configKey: String = "plugin2"
+ override def database: String = "database2"
+ }
+
+ override protected def beforeEach(): Unit = {
+ super.beforeAll()
+
+ ListSet(eventSourced1, eventSourced2).foreach { eventSourced =>
+ val journalConfig =
eventSourced.config.getConfig(s"${eventSourced.configKey}.journal")
+ val journalSettings: JournalSettings = JournalSettings(journalConfig)
+
+ val snapshotSettings: SnapshotSettings =
+
SnapshotSettings(eventSourced.config.getConfig(s"${eventSourced.configKey}.snapshot"))
+
+ // making sure that test harness does not initialize connection factory
for the plugin that is being tested
+ val connectionFactoryProvider =
+ ConnectionFactoryProvider(system)
+
.connectionFactoryFor(s"test.${eventSourced.configKey}.connection-factory",
+
journalConfig.getConfig(journalSettings.useConnectionFactory).atPath(
+ s"test.${eventSourced.configKey}.connection-factory"))
+
+ // this assumes that journal, snapshot store and state use same
connection settings
+ val r2dbcExecutor: R2dbcExecutor =
+ new R2dbcExecutor(
+ connectionFactoryProvider,
+ LoggerFactory.getLogger(getClass),
+ journalSettings.logDbCallsExceeding)(system.executionContext, system)
+
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from
${journalSettings.journalTableWithSchema}")),
+ 10.seconds)
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from
${snapshotSettings.snapshotsTableWithSchema}")),
+ 10.seconds)
+ }
+
+ ListSet(state1, state2).foreach { state =>
+ val stateConfig = state.config.getConfig(s"${state.configKey}.state")
+ val stateSettings: StateSettings = StateSettings(stateConfig)
+
+ // making sure that test harness does not initialize connection factory
for the plugin that is being tested
+ val connectionFactoryProvider =
+ ConnectionFactoryProvider(system)
+ .connectionFactoryFor(s"test.${state.configKey}.connection-factory",
+ state.config.getConfig(stateSettings.useConnectionFactory).atPath(
+ s"test.${state.configKey}.connection-factory"))
+
+ val r2dbcExecutor: R2dbcExecutor =
+ new R2dbcExecutor(
+ connectionFactoryProvider,
+ LoggerFactory.getLogger(getClass),
+ stateSettings.logDbCallsExceeding)(system.executionContext, system)
+
+ Await.result(
+ r2dbcExecutor.updateOne("beforeAll delete")(
+ _.createStatement(s"delete from
${stateSettings.durableStateTableWithSchema}")),
+ 10.seconds)
+ }
+ }
+
+ "Runtime plugin config" should {
+ "work for journal, query and snapshot store plugins" in {
+ val probe = createTestProbe[Any]()
+
+ {
+ // one actor in each journal with same id
+ val j1 = spawn(eventSourced1("id1"))
+ val j2 = spawn(eventSourced2("id1"))
+ j1 ! EventSourced.Save("j1m1", probe.ref)
+ probe.receiveMessage()
+ j2 ! EventSourced.Save("j2m1", probe.ref)
+ probe.receiveMessage()
+ }
+
+ {
+ def assertJournal(eventSourced: EventSourced, expectedEvent: String) =
{
+ val ref =
Persistence(system).journalFor(s"${eventSourced.configKey}.journal",
eventSourced.config)
+ ref.tell(ReplayMessages(0, Long.MaxValue, Long.MaxValue, "id1",
probe.ref.toClassic), probe.ref.toClassic)
+ inside(probe.receiveMessage()) {
+ case ReplayedMessage(persistentRepr) =>
+ persistentRepr.persistenceId shouldBe "id1"
+ persistentRepr.payload shouldBe expectedEvent
+ }
+ probe.expectMessage(RecoverySuccess(1))
+ }
+
+ assertJournal(eventSourced1, "j1m1")
+ assertJournal(eventSourced2, "j2m1")
+ }
+
+ {
+ def assertQuery(eventSourced: EventSourced, expectedEvent: String) = {
+ val readJournal =
+
PersistenceQuery(system).readJournalFor[R2dbcReadJournal](s"${eventSourced.configKey}.query",
+ eventSourced.config)
+ val events = readJournal.currentEventsByPersistenceId("id1", 0,
Long.MaxValue)
+ .map(_.event)
+ .runWith(Sink.seq).futureValue
+ events should contain theSameElementsAs Seq(expectedEvent)
+ }
+
+ assertQuery(eventSourced1, "j1m1")
+ assertQuery(eventSourced2, "j2m1")
+ }
+
+ {
+ def assertSnapshot(eventSourced: EventSourced, expectedShapshot:
String) = {
+ val ref =
Persistence(system).snapshotStoreFor(s"${eventSourced.configKey}.snapshot",
eventSourced.config)
+ ref.tell(LoadSnapshot("id1", SnapshotSelectionCriteria.Latest,
Long.MaxValue),
+ probe.ref.toClassic)
+ inside(probe.receiveMessage()) {
+ case LoadSnapshotResult(Some(SelectedSnapshot(_, snapshot)), _) =>
+ snapshot shouldBe expectedShapshot
+ }
+ }
+
+ assertSnapshot(eventSourced1, "j1m1")
+ assertSnapshot(eventSourced2, "j2m1")
+ }
+ }
+
+ "work for durable state plugin" in {
+ // persist data on both plugins
+ state1.store.upsertObject("id1", 1, "j1m1", "").futureValue
+ state2.store.upsertObject("id1", 1, "j2m1", "").futureValue
+
+ def assertState(state: DurableState, expectedState: String) = {
+ inside(state.store.getObject("id1").futureValue) {
+ case GetObjectResult(Some(value), revision) =>
+ value shouldBe expectedState
+ revision shouldBe 1
+ }
+ }
+
+ assertState(state1, "j1m1")
+ assertState(state2, "j2m1")
+ }
+ }
+}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
index d08d2da..bbb2c9f 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsByPersistenceIdSpec.scala
@@ -22,7 +22,6 @@ import pekko.actor.typed.ActorSystem
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
-import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.TestActors
import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestActors.Persister.PersistWithAck
@@ -52,7 +51,6 @@ class EventsByPersistenceIdSpec
import EventsByPersistenceIdSpec._
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
private val query =
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
index d980798..385252e 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceBacktrackingSpec.scala
@@ -24,7 +24,7 @@ import pekko.persistence.query.NoOffset
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.QuerySettings
import pekko.persistence.r2dbc.TestConfig
import pekko.persistence.r2dbc.TestData
import pekko.persistence.r2dbc.TestDbLifecycle
@@ -45,7 +45,7 @@ class EventsBySliceBacktrackingSpec
with LogCapturing {
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+ private val settings =
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
private val query = PersistenceQuery(testKit.system)
.readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
@@ -135,14 +135,14 @@ class EventsBySliceBacktrackingSpec
writeEvent(slice2, pid2, 1L, startTime.plusMillis(2), "e2-1")
// no backtracking yet
- result.expectNoMessage(settings.querySettings.refreshInterval +
100.millis)
+ result.expectNoMessage(settings.refreshInterval + 100.millis)
// after 1/2 of the backtracking widow, to kick off a backtracking query
writeEvent(
slice1,
pid1,
4L,
-
startTime.plusMillis(settings.querySettings.backtrackingWindow.toMillis /
2).plusMillis(4),
+ startTime.plusMillis(settings.backtrackingWindow.toMillis /
2).plusMillis(4),
"e1-4")
val env6 = result.expectNext()
env6.persistenceId shouldBe pid1
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
index bc2bcfc..125a2f8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySlicePubSubSpec.scala
@@ -44,15 +44,18 @@ import com.typesafe.config.ConfigFactory
import org.scalatest.wordspec.AnyWordSpecLike
object EventsBySlicePubSubSpec {
- def config: Config = ConfigFactory
- .parseString("""
+ def config: Config =
+ ConfigFactory.load(
+ ConfigFactory
+ .parseString("""
pekko.persistence.r2dbc {
journal.publish-events = on
# no events from database query, only via pub-sub
- query.behind-current-time = 5 minutes
+ behind-current-time = 5 minutes
}
""")
-
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.config))
+
.withFallback(TestConfig.backtrackingDisabledConfig.withFallback(TestConfig.unresolvedConfig))
+ )
}
class EventsBySlicePubSubSpec
@@ -99,6 +102,8 @@ class EventsBySlicePubSubSpec
"EventsBySlices pub-sub" should {
"publish new events" in new Setup {
+
system.settings.config.getBoolean("pekko.persistence.r2dbc.journal.publish-events")
shouldBe true
+
system.settings.config.getBoolean("pekko.persistence.r2dbc.query.publish-events")
shouldBe true
val result: TestSubscriber.Probe[EventEnvelope[String]] =
query.eventsBySlices[String](setupEntityType, slice, slice,
NoOffset).runWith(sinkProbe).request(10)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
index ba2fdd8..6d52110 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/query/EventsBySliceSpec.scala
@@ -29,7 +29,6 @@ import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.query.typed.scaladsl.EventTimestampQuery
import pekko.persistence.query.typed.scaladsl.LoadEventQuery
-import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.TestActors
import pekko.persistence.r2dbc.TestActors.Persister
import pekko.persistence.r2dbc.TestActors.Persister.Persist
@@ -83,7 +82,6 @@ class EventsBySliceSpec
import EventsBySliceSpec._
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
private val query =
PersistenceQuery(testKit.system).readJournalFor[R2dbcReadJournal](R2dbcReadJournal.Identifier)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
index a344643..5c30435 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateBySliceSpec.scala
@@ -29,7 +29,6 @@ import pekko.persistence.query.NoOffset
import pekko.persistence.query.Offset
import pekko.persistence.query.TimestampOffset
import pekko.persistence.query.UpdatedDurableState
-import pekko.persistence.r2dbc.R2dbcSettings
import pekko.persistence.r2dbc.TestActors
import pekko.persistence.r2dbc.TestActors.DurableStatePersister.Persist
import pekko.persistence.r2dbc.TestActors.DurableStatePersister.PersistWithAck
@@ -71,7 +70,6 @@ class DurableStateBySliceSpec
import DurableStateBySliceSpec._
override def typedSystem: ActorSystem[_] = system
- private val settings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
private val query = DurableStateStoreRegistry(testKit.system)
.durableStateStoreFor[R2dbcDurableStateStore[String]](R2dbcDurableStateStore.Identifier)
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
index 43fdf79..7fb9841 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/r2dbc/state/DurableStateStoreSpec.scala
@@ -83,7 +83,7 @@ class DurableStateStoreSpec
}
"detect and reject concurrent updates" in {
- if (!r2dbcSettings.durableStateAssertSingleWriter)
+ if (!stateSettings.durableStateAssertSingleWriter)
pending
val entityType = nextEntityType()
diff --git a/docs/src/main/paradox/connection-config.md
b/docs/src/main/paradox/config.md
similarity index 53%
rename from docs/src/main/paradox/connection-config.md
rename to docs/src/main/paradox/config.md
index c96a6b6..b6da7ae 100644
--- a/docs/src/main/paradox/connection-config.md
+++ b/docs/src/main/paradox/config.md
@@ -1,4 +1,6 @@
-# Connection configuration
+# Configuration
+
+## Connection configuration
Shared configuration for the connection pool is located under
`pekko.persistence.r2dbc.connection-factory`.
You have to set at least:
@@ -14,6 +16,14 @@ MySQL:
## Reference configuration
-The following can be overridden in your `application.conf` for the connection
settings:
+The following configuration can be overridden in your `application.conf`:
@@snip [reference.conf](/core/src/main/resources/reference.conf)
{#connection-settings}
+
+## Plugin configuration at runtime
+
+Plugin implementation supports plugin configuration at runtime.
+
+The following example demonstrates how the database to which the events and
snapshots of an `EventSourcedBehavior` are stored can be set during runtime:
+
+@@snip
[RuntimePluginConfigExample.scala](/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala)
{ #runtime-plugin-config }
diff --git a/docs/src/main/paradox/durable-state-store.md
b/docs/src/main/paradox/durable-state-store.md
index 2e8945b..397413f 100644
--- a/docs/src/main/paradox/durable-state-store.md
+++ b/docs/src/main/paradox/durable-state-store.md
@@ -19,7 +19,7 @@ pekko.persistence.state.plugin =
"pekko.persistence.r2dbc.state"
It can also be enabled with the `durableStateStorePluginId` for a specific
`DurableStateBehavior` and multiple
plugin configurations are supported.
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
### Reference configuration
diff --git a/docs/src/main/paradox/index.md b/docs/src/main/paradox/index.md
index 76947a3..599b986 100644
--- a/docs/src/main/paradox/index.md
+++ b/docs/src/main/paradox/index.md
@@ -8,7 +8,7 @@ The Pekko Persistence R2DBC plugin allows for using SQL
database with R2DBC as a
* [overview](overview.md)
* [Getting Started](getting-started.md)
-* [Getting Started](connection-config.md)
+* [Configuration](config.md)
* [License Report](license-report.md)
* [Journal Plugin](journal.md)
* [Snapshot Plugin](snapshots.md)
diff --git a/docs/src/main/paradox/journal.md b/docs/src/main/paradox/journal.md
index cb33548..1b76ea8 100644
--- a/docs/src/main/paradox/journal.md
+++ b/docs/src/main/paradox/journal.md
@@ -23,7 +23,7 @@ pekko.persistence.journal.plugin =
"pekko.persistence.r2dbc.journal"
It can also be enabled with the `journalPluginId` for a specific
`EventSourcedBehavior` and multiple
plugin configurations are supported.
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
### Reference configuration
diff --git a/docs/src/main/paradox/projection.md
b/docs/src/main/paradox/projection.md
index f01c4cc..a1ca6e3 100644
--- a/docs/src/main/paradox/projection.md
+++ b/docs/src/main/paradox/projection.md
@@ -45,7 +45,7 @@ need to be created in the configured database, see schema
definition in @ref:[Cr
## Configuration
By default, `pekko-projection-r2dbc` uses the same connection pool and
`dialect` as `pekko-persistence-r2dbc`, see
-@ref:[Connection configuration](connection-config.md).
+@ref:[Connection configuration](config.md#connection-configuration).
### Reference configuration
diff --git a/docs/src/main/paradox/query.md b/docs/src/main/paradox/query.md
index edf095e..e4ba5c9 100644
--- a/docs/src/main/paradox/query.md
+++ b/docs/src/main/paradox/query.md
@@ -152,5 +152,5 @@ Query configuration is under
`pekko.persistence.r2dbc.query`. Here's the default
@@snip [reference.conf](/core/src/main/resources/reference.conf) {
#query-settings }
-The query plugin shares the connection pool as the rest of the plugin, see
@ref:[Connection configuration](connection-config.md).
+The query plugin shares the connection pool as the rest of the plugin, see
@ref:[Connection configuration](config.md#connection-configuration).
diff --git a/docs/src/main/paradox/snapshots.md
b/docs/src/main/paradox/snapshots.md
index e387a23..f7c1fb3 100644
--- a/docs/src/main/paradox/snapshots.md
+++ b/docs/src/main/paradox/snapshots.md
@@ -17,7 +17,7 @@ pekko.persistence.snapshot-store.plugin =
"pekko.persistence.r2dbc.snapshot"
It can also be enabled with the `snapshotPluginId` for a specific
`EventSourcedBehavior` and multiple
plugin configurations are supported.
-See also @ref:[Connection configuration](connection-config.md).
+See also @ref:[Configuration](config.md).
### Reference configuration
diff --git a/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala
b/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala
new file mode 100644
index 0000000..559f54b
--- /dev/null
+++ b/docs/src/test/scala/docs/home/RuntimePluginConfigExample.scala
@@ -0,0 +1,79 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package docs.home
+
+import org.apache.pekko
+import pekko.actor.typed.scaladsl.ActorContext
+import pekko.persistence.typed.PersistenceId
+import pekko.persistence.typed.scaladsl.EventSourcedBehavior
+import com.typesafe.config.Config
+import com.typesafe.config.ConfigFactory
+
+object RuntimePluginConfigExample {
+
+ type Command
+ val context: ActorContext[_] = ???
+
+ // #runtime-plugin-config
+ def eventSourcedBehaviorForDatabase(database: String) = {
+ val configKey = s"config-for-$database"
+
+ val config: Config =
+ ConfigFactory
+ .load(
+ ConfigFactory
+ .parseString(
+ s"""
+ $configKey = $${pekko.persistence.r2dbc}
+ $configKey = {
+ connection-factory {
+ database = "$database"
+ }
+
+ journal.$configKey.connection-factory =
$${$configKey.connection-factory}
+ journal.use-connection-factory =
"$configKey.connection-factory"
+ snapshot.$configKey.connection-factory =
$${$configKey.connection-factory}
+ snapshot.use-connection-factory =
"$configKey.connection-factory"
+ }
+ """
+ )
+ )
+
+ (persistenceId: String) =>
+ EventSourcedBehavior[Command, String, String](
+ PersistenceId.ofUniqueId(persistenceId),
+ emptyState = ???,
+ commandHandler = ???,
+ eventHandler = ???)
+ .withJournalPluginId(s"$configKey.journal")
+ .withJournalPluginConfig(Some(config))
+ .withSnapshotPluginId(s"$configKey.snapshot")
+ .withSnapshotPluginConfig(Some(config))
+ }
+
+ val eventSourcedBehaviorForDatabase1 =
eventSourcedBehaviorForDatabase("database-1")
+ context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-1"),
"Actor-1")
+ context.spawn(eventSourcedBehaviorForDatabase1("persistence-id-2"),
"Actor-2")
+
+ val eventSourcedBehaviorForDatabase2 =
eventSourcedBehaviorForDatabase("database-2")
+ context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-1"),
"Actor-3")
+ context.spawn(eventSourcedBehaviorForDatabase2("persistence-id-2"),
"Actor-4")
+ // #runtime-plugin-config
+}
diff --git
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
index 33a9868..055138c 100644
---
a/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
+++
b/migration/src/main/scala/org/apache/pekko/persistence/r2dbc/migration/MigrationTool.scala
@@ -14,12 +14,12 @@
package org.apache.pekko.persistence.r2dbc.migration
import java.time.Instant
+
import scala.concurrent.Future
import scala.concurrent.duration._
import scala.util.Failure
import scala.util.Success
import scala.util.Try
-
import org.apache.pekko
import pekko.Done
import pekko.actor.typed.ActorSystem
@@ -32,13 +32,14 @@ import pekko.persistence.SelectedSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshot
import pekko.persistence.SnapshotProtocol.LoadSnapshotResult
import pekko.persistence.SnapshotSelectionCriteria
-import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
import pekko.persistence.query.PersistenceQuery
import pekko.persistence.query.scaladsl.CurrentEventsByPersistenceIdQuery
import pekko.persistence.query.scaladsl.CurrentPersistenceIdsQuery
import pekko.persistence.query.scaladsl.ReadJournal
+import pekko.persistence.query.{ EventEnvelope => ClassicEventEnvelope }
import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.SnapshotSettings
import pekko.persistence.r2dbc.journal.JournalDao
import pekko.persistence.r2dbc.journal.JournalDao.SerializedEventMetadata
import pekko.persistence.r2dbc.journal.JournalDao.SerializedJournalRow
@@ -116,16 +117,20 @@ class MigrationTool(system: ActorSystem[_]) {
private val parallelism = migrationConfig.getInt("parallelism")
private val targetPluginId =
migrationConfig.getString("target.persistence-plugin-id")
- private val targetR2dbcSettings =
R2dbcSettings(system.settings.config.getConfig(targetPluginId))
+ private val targetConfig = system.settings.config.getConfig(targetPluginId)
+ private val targetJournalSettings =
JournalSettings(targetConfig.getConfig("journal"))
+ private val targetSnapshotettings =
SnapshotSettings(targetConfig.getConfig("snapshot"))
private val serialization: Serialization = SerializationExtension(system)
- private val targetConnectionFactory = ConnectionFactoryProvider(system)
- .connectionFactoryFor(targetPluginId + ".connection-factory")
+ private val targetJournalConnectionFactory =
ConnectionFactoryProvider(system)
+ .connectionFactoryFor(targetJournalSettings.useConnectionFactory)
private val targetJournalDao =
- new JournalDao(targetR2dbcSettings, targetConnectionFactory)
+ new JournalDao(targetJournalSettings, targetJournalConnectionFactory)
private val targetSnapshotDao =
- new SnapshotDao(targetR2dbcSettings, targetConnectionFactory)
+ new SnapshotDao(targetSnapshotettings,
+ ConnectionFactoryProvider(system)
+ .connectionFactoryFor(targetSnapshotettings.useConnectionFactory))
private val targetBatch = migrationConfig.getInt("target.batch")
@@ -138,7 +143,7 @@ class MigrationTool(system: ActorSystem[_]) {
private lazy val sourceSnapshotStore =
Persistence(system).snapshotStoreFor(sourceSnapshotPluginId)
private[r2dbc] val migrationDao =
- new MigrationToolDao(targetConnectionFactory,
targetR2dbcSettings.logDbCallsExceeding)
+ new MigrationToolDao(targetJournalConnectionFactory,
targetJournalSettings.logDbCallsExceeding)
private lazy val createProgressTable: Future[Done] =
migrationDao.createProgressTable()
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
index f633fc4..b68c9e7 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedEndToEndSpec.scala
@@ -19,6 +19,7 @@ import java.util.UUID
import scala.concurrent.ExecutionContext
import scala.concurrent.Future
import scala.concurrent.duration._
+import org.apache.pekko.persistence.r2dbc.QuerySettings
import org.apache.pekko
import pekko.Done
import pekko.actor.testkit.typed.scaladsl.LogCapturing
@@ -29,7 +30,7 @@ import pekko.actor.typed.Behavior
import pekko.actor.typed.scaladsl.Behaviors
import pekko.persistence.query.typed.EventEnvelope
import pekko.persistence.r2dbc.Dialect
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
import pekko.persistence.r2dbc.internal.Sql.DialectInterpolation
import pekko.persistence.r2dbc.query.scaladsl.R2dbcReadJournal
import pekko.persistence.typed.PersistenceId
@@ -141,7 +142,7 @@ class EventSourcedEndToEndSpec
private val log = LoggerFactory.getLogger(getClass)
- private val journalSettings = new
R2dbcSettings(system.settings.config.getConfig("pekko.persistence.r2dbc"))
+ private val querySettings =
QuerySettings(system.settings.config.getConfig("pekko.persistence.r2dbc.query"))
private val projectionSettings = R2dbcProjectionSettings(system)
private val stringSerializer =
SerializationExtension(system).serializerFor(classOf[String])
@@ -154,7 +155,7 @@ class EventSourcedEndToEndSpec
log.debug("Write test event [{}] [{}] [{}] at time [{}]", persistenceId,
seqNr: java.lang.Long, event, timestamp)
implicit val dialect: Dialect = projectionSettings.dialect
val insertEventSql = sql"""
- INSERT INTO ${journalSettings.journalTableWithSchema}
+ INSERT INTO ${querySettings.journalTableWithSchema}
(slice, entity_type, persistence_id, seq_nr, db_timestamp, writer,
adapter_manifest, event_ser_id, event_ser_manifest, event_payload)
VALUES (?, ?, ?, ?, ?, '', '', ?, '', ?)"""
@@ -323,13 +324,12 @@ class EventSourcedEndToEndSpec
// pid3, seqNr 8 is missing (knows 7) when receiving 9
writeEvent(pid3, 9L, startTime.plusMillis(4), "e3-9")
-
processedProbe.expectNoMessage(journalSettings.querySettings.refreshInterval +
2000.millis)
+ processedProbe.expectNoMessage(querySettings.refreshInterval +
2000.millis)
// but backtracking can fill in the gaps, backtracking will pick up pid3
seqNr 8 and 9
writeEvent(pid3, 8L, startTime.plusMillis(3), "e3-8")
val possibleDelay =
- journalSettings.querySettings.backtrackingBehindCurrentTime +
journalSettings.querySettings
- .refreshInterval + processedProbe.remainingOrDefault
+ querySettings.backtrackingBehindCurrentTime +
querySettings.refreshInterval + processedProbe.remainingOrDefault
processedProbe.receiveMessage(possibleDelay).envelope.event shouldBe
"e3-8"
processedProbe.receiveMessage(possibleDelay).envelope.event shouldBe
"e3-9"
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
index 9c88477..306d2d8 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/EventSourcedPubSubSpec.scala
@@ -43,23 +43,23 @@ import org.slf4j.LoggerFactory
object EventSourcedPubSubSpec {
- val config: Config = ConfigFactory
- .parseString("""
- pekko.persistence.r2dbc {
- journal.publish-events = on
- query {
+ val config: Config = ConfigFactory.load(
+ ConfigFactory
+ .parseString("""
+ pekko.persistence.r2dbc {
+ journal.publish-events = on
refresh-interval = 3 seconds
- # simulate lost messages by overflowing the buffer
- buffer-size = 10
+ # simulate lost messages by overflowing the buffer
+ buffer-size = 10
- backtracking {
- behind-current-time = 5 seconds
- window = 20 seconds
- }
+ backtracking {
+ behind-current-time = 5 seconds
+ window = 20 seconds
+ }
}
- }
- """)
- .withFallback(TestConfig.config)
+ """)
+ .withFallback(TestConfig.unresolvedConfig)
+ )
final case class Processed(projectionId: ProjectionId, envelope:
EventEnvelope[String])
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
index fc48aff..aadf9a4 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestConfig.scala
@@ -17,7 +17,7 @@ import com.typesafe.config.Config
import com.typesafe.config.ConfigFactory
object TestConfig {
- lazy val config: Config = {
+ lazy val unresolvedConfig: Config = {
val defaultConfig = ConfigFactory.load()
val dialect = defaultConfig.getString("pekko.projection.r2dbc.dialect")
@@ -62,7 +62,7 @@ object TestConfig {
}
// using load here so that connection-factory can be overridden
- ConfigFactory.load(dialectConfig.withFallback(ConfigFactory.parseString("""
+ dialectConfig.withFallback(ConfigFactory.parseString("""
pekko.persistence.journal.plugin = "pekko.persistence.r2dbc.journal"
pekko.persistence.state.plugin = "pekko.persistence.r2dbc.state"
pekko.persistence.r2dbc {
@@ -71,6 +71,8 @@ object TestConfig {
}
}
pekko.actor.testkit.typed.default-timeout = 10s
- """)))
+ """))
}
+
+ lazy val config: Config = ConfigFactory.load(unresolvedConfig)
}
diff --git
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
index b794b97..552c5c6 100644
---
a/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
+++
b/projection/src/test/scala/org/apache/pekko/projection/r2dbc/TestDbLifecycle.scala
@@ -15,12 +15,12 @@ package org.apache.pekko.projection.r2dbc
import scala.concurrent.Await
import scala.concurrent.duration._
-
import org.apache.pekko
import pekko.actor.typed.ActorSystem
import pekko.persistence.Persistence
import pekko.persistence.r2dbc.ConnectionFactoryProvider
-import pekko.persistence.r2dbc.R2dbcSettings
+import pekko.persistence.r2dbc.JournalSettings
+import pekko.persistence.r2dbc.StateSettings
import pekko.persistence.r2dbc.internal.R2dbcExecutor
import org.scalatest.BeforeAndAfterAll
import org.scalatest.Suite
@@ -37,7 +37,11 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this:
Suite =>
lazy val r2dbcExecutor: R2dbcExecutor = {
new R2dbcExecutor(
-
ConnectionFactoryProvider(typedSystem).connectionFactoryFor(r2dbcProjectionSettings.useConnectionFactory),
+ // making sure that test harness does not initialize connection factory
for the plugin that is being tested
+ ConnectionFactoryProvider(typedSystem)
+ .connectionFactoryFor("test.connection-factory",
+
typedSystem.settings.config.getConfig(r2dbcProjectionSettings.useConnectionFactory).atPath(
+ "test.connection-factory")),
LoggerFactory.getLogger(getClass),
r2dbcProjectionSettings.logDbCallsExceeding)(typedSystem.executionContext,
typedSystem)
}
@@ -45,15 +49,17 @@ trait TestDbLifecycle extends BeforeAndAfterAll { this:
Suite =>
lazy val persistenceExt: Persistence = Persistence(typedSystem)
override protected def beforeAll(): Unit = {
- lazy val r2dbcSettings: R2dbcSettings =
- new
R2dbcSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc"))
+ lazy val journalSettings: JournalSettings =
+ new
JournalSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.journal"))
+ lazy val stateSettings: StateSettings =
+ new
StateSettings(typedSystem.settings.config.getConfig("pekko.persistence.r2dbc.state"))
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
- _.createStatement(s"delete from
${r2dbcSettings.journalTableWithSchema}")),
+ _.createStatement(s"delete from
${journalSettings.journalTableWithSchema}")),
10.seconds)
Await.result(
r2dbcExecutor.updateOne("beforeAll delete")(
- _.createStatement(s"delete from
${r2dbcSettings.durableStateTableWithSchema}")),
+ _.createStatement(s"delete from
${stateSettings.durableStateTableWithSchema}")),
10.seconds)
if (r2dbcProjectionSettings.isOffsetTableDefined) {
Await.result(
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]