This is an automated email from the ASF dual-hosted git repository.
fanningpj pushed a commit to branch 1.2.x
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/1.2.x by this push:
new 2399479 Limit events by tag query ordering sizes (#380) (#381)
2399479 is described below
commit 2399479419b1072c1681eb3eb21312896caff25d
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Nov 24 22:25:50 2025 +0100
Limit events by tag query ordering sizes (#380) (#381)
* Limit events by tag query ordering sizes (#380)
* Limit events by tag query ordering sizes
* fix: The new events-by-tag-buffer-sizes-per-query would miss events for
infrequent tags
file name issue
Update EventsByInfrequentTagTest.scala
* scalafmt
---
core/src/main/resources/reference.conf | 19 +++++
.../jdbc/config/PekkoPersistenceConfig.scala | 2 +
.../jdbc/query/scaladsl/JdbcReadJournal.scala | 22 +++++-
.../jdbc/query/EventsByInfrequentTagTest.scala | 83 ++++++++++++++++++++++
.../scaladsl/DurableStateStorePluginSpec.scala | 2 +-
5 files changed, 124 insertions(+), 4 deletions(-)
diff --git a/core/src/main/resources/reference.conf
b/core/src/main/resources/reference.conf
index 665f900..caeaac7 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -413,6 +413,25 @@ jdbc-read-journal {
# are delivered downstreams.
max-buffer-size = "500"
+ # Number of 'max-buffer-size's to limit each events by tag query
+ #
+ # Events by tag will fetch batches of elements limiting both using the DB
LIMIT support and
+ # the "ordering" column of the journal. When executing a query starting from
the beginning of the
+ # journal, for example adding a new projection to an existing application
with a large number
+ # of already persisted events this can cause performance problems in some
databases.
+ #
+ # This factor limits the "slices" of ordering the journal is queried for
into smaller chunks,
+ # issuing more queries where each query covers a smaller slice of the
journal instead of one
+ # covering the entire journal.
+ #
+ # Note that setting this too low will have a performance overhead in many
queries being issued where
+ # each query returns no or very few entries, but what number is to low
depends on how many tags are
+ # used and how well those are distributed, setting this value requires
application specific benchmarking
+ # to find a good number.
+ #
+ # 0 means disable the factor and query the entire journal and limit to
max-buffer-size elements
+ events-by-tag-buffer-sizes-per-query = 0
+
# If enabled, automatically close the database connection when the actor
system is terminated
add-shutdown-hook = true
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
index 71b5ceb..288a4e0 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
@@ -196,6 +196,8 @@ class ReadJournalConfig(config: Config) {
val pluginConfig = new ReadJournalPluginConfig(config)
val refreshInterval: FiniteDuration =
config.asFiniteDuration("refresh-interval")
val maxBufferSize: Int = config.getInt("max-buffer-size")
+ val eventsByTagBufferSizesPerQuery: Long =
config.getLong("events-by-tag-buffer-sizes-per-query")
+ require(eventsByTagBufferSizesPerQuery >= 0,
"events-by-tag-buffer-sizes-per-query must not be negative")
val addShutdownHook: Boolean = config.getBoolean("add-shutdown-hook")
override def toString: String =
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
index bf34df9..db0e378 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
@@ -235,15 +235,31 @@ class JdbcReadJournal(config: Config, configPath:
String)(implicit val system: E
import FlowControl._
implicit val askTimeout: Timeout =
Timeout(readJournalConfig.journalSequenceRetrievalConfiguration.askTimeout)
val batchSize = readJournalConfig.maxBufferSize
+ val maxOrderingRange = readJournalConfig.eventsByTagBufferSizesPerQuery
match {
+ case 0 => None
+ case x => Some(x * batchSize)
+ }
+
+ def getLoopMaxOrderingId(offset: Long, latestOrdering: MaxOrderingId):
MaxOrderingId =
+ maxOrderingRange match {
+ case None => latestOrdering
+ case Some(numberOfEvents) =>
+ val limitedMaxOrderingId = offset + numberOfEvents
+ if (limitedMaxOrderingId < 0 || limitedMaxOrderingId >=
latestOrdering.maxOrdering) latestOrdering
+ else MaxOrderingId(limitedMaxOrderingId)
+ }
Source
.unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset,
Continue)) { case (from, control) =>
def retrieveNextBatch() = {
for {
queryUntil <-
journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
- xs <- currentJournalEventsByTag(tag, from, batchSize,
queryUntil).runWith(Sink.seq)
+ loopMaxOrderingId = getLoopMaxOrderingId(from, queryUntil)
+ xs <- currentJournalEventsByTag(tag, from, batchSize,
loopMaxOrderingId).runWith(Sink.seq)
} yield {
- val hasMoreEvents = xs.size == batchSize
+ // continue if query over entire journal was fewer than full batch
or if we are limiting
+ // the query through eventsByTagBufferSizesPerQuery and didn't
reach the last 'ordering' yet
+ val hasMoreEvents = (xs.size == batchSize) ||
(loopMaxOrderingId.maxOrdering < queryUntil.maxOrdering)
val nextControl: FlowControl =
terminateAfterOffset match {
// we may stop if target is behind queryUntil and we don't
have more events to fetch
@@ -260,7 +276,7 @@ class JdbcReadJournal(config: Config, configPath:
String)(implicit val system: E
/* If no events matched the tag between `from` and `maxOrdering`
then there is no need to execute the exact
* same query again. We can continue querying from
`maxOrdering`, which will save some load on the db.
* (Note: we may never return a value smaller than `from`,
otherwise we might return duplicate events) */
- math.max(from, queryUntil.maxOrdering)
+ math.max(from, loopMaxOrderingId.maxOrdering)
} else {
// Continue querying from the largest offset
xs.map(_.offset.value).max
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
new file mode 100644
index 0000000..6707b97
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ * https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
+ * Copyright (C) 2019 - 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.jdbc.query
+
+import org.apache.pekko
+import pekko.pattern.ask
+import pekko.persistence.jdbc.query.EventsByInfrequentTagTest._
+import pekko.persistence.query.{ EventEnvelope, NoOffset, Sequence }
+import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
+
+import scala.concurrent.duration._
+
+object EventsByInfrequentTagTest {
+ val maxBufferSize = 20
+ val refreshInterval = 500.milliseconds
+
+ val configOverrides: Map[String, ConfigValue] = Map(
+ "jdbc-read-journal.events-by-tag-buffer-sizes-per-query" ->
ConfigValueFactory.fromAnyRef(1.toString),
+ "jdbc-read-journal.max-buffer-size" ->
ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
+ "jdbc-read-journal.refresh-interval" ->
ConfigValueFactory.fromAnyRef(refreshInterval.toString()))
+}
+
+abstract class EventsByInfrequentTagTest(config: String) extends
QueryTestSpec(config, configOverrides) {
+
+ final val NoMsgTime: FiniteDuration = 100.millis
+ it should "persist and find a tagged event with multiple (frequent and
infrequent) tags" in withActorSystem {
+ implicit system =>
+ pendingIfOracleWithLegacy()
+
+ val journalOps = new ScalaJdbcReadJournalOperations(system)
+ withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
+ val often = "often"
+ val notOften = "not-often"
+ withClue("Persisting multiple tagged events") {
+ (0 until 100).foreach { i =>
+ val additional = if (i % 40 == 0) {
+ Seq(notOften)
+ } else Seq.empty
+ val tags = Seq(often) ++ additional
+ (actor1 ? withTags(1, tags: _*)).futureValue
+ }
+
+ eventually {
+ journalOps.countJournal.futureValue shouldBe 100
+ }
+ journalOps.withEventsByTag()(often, NoOffset) { tp =>
+ tp.request(Int.MaxValue)
+ (0 until 100).foreach { i =>
+ tp.expectNextPF { case EventEnvelope(Sequence(i), _, _, _) => }
+ }
+ tp.cancel()
+ tp.expectNoMessage(NoMsgTime)
+ }
+
+ journalOps.withEventsByTag(10.seconds)(notOften, NoOffset) { tp =>
+ tp.request(Int.MaxValue)
+ tp.expectNextPF { case EventEnvelope(Sequence(1), _, _, _) => }
+ tp.expectNextPF { case EventEnvelope(Sequence(41), _, _, _) => }
+ tp.expectNextPF { case EventEnvelope(Sequence(81), _, _, _) => }
+
+ tp.cancel()
+ tp.expectNoMessage(NoMsgTime)
+ }
+ }
+
+ }
+ }
+
+}
+
+class H2ScalaEventsByInfrequentTagTest extends
EventsByInfrequentTagTest("h2-shared-db-application.conf") with H2Cleaner
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
index f3e818f..429e0f3 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/state/scaladsl/DurableStateStorePluginSpec.scala
@@ -71,7 +71,7 @@ abstract class DurableStateStoreSchemaPluginSpec(val config:
Config, profile: Jd
val customConfig: Config = profile match {
case _: MySQLProfile => ConfigFactory.empty()
- case _ =>
+ case _ =>
ConfigFactory.parseString("""
jdbc-durable-state-store {
tables {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]