This is an automated email from the ASF dual-hosted git repository.

fanningpj pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 6f420d2  Limit events by tag query ordering sizes (#380)
6f420d2 is described below

commit 6f420d289378c2b80e65861096a2c1feb7a84afe
Author: PJ Fanning <[email protected]>
AuthorDate: Mon Nov 24 16:39:02 2025 +0100

    Limit events by tag query ordering sizes (#380)
    
    * Limit events by tag query ordering sizes
    
    * fix: The new events-by-tag-buffer-sizes-per-query would miss events for 
infrequent tags
    
    file name issue
    
    Update EventsByInfrequentTagTest.scala
---
 core/src/main/resources/reference.conf             | 19 +++++
 .../jdbc/config/PekkoPersistenceConfig.scala       |  2 +
 .../jdbc/query/scaladsl/JdbcReadJournal.scala      | 22 +++++-
 .../jdbc/query/EventsByInfrequentTagTest.scala     | 83 ++++++++++++++++++++++
 4 files changed, 123 insertions(+), 3 deletions(-)

diff --git a/core/src/main/resources/reference.conf 
b/core/src/main/resources/reference.conf
index 665f900..caeaac7 100644
--- a/core/src/main/resources/reference.conf
+++ b/core/src/main/resources/reference.conf
@@ -413,6 +413,25 @@ jdbc-read-journal {
   # are delivered downstreams.
   max-buffer-size = "500"
 
+  # Number of 'max-buffer-size's to limit each events by tag query
+  #
+  # Events by tag will fetch batches of elements limiting both using the DB 
LIMIT support and
+  # the "ordering" column of the journal. When executing a query starting from 
the beginning of the
+  # journal, for example adding a new projection to an existing application 
with a large number
+  # of already persisted events this can cause performance problems in some 
databases.
+  #
+  # This factor limits the "slices" of ordering the journal is queried for 
into smaller chunks,
+  # issuing more queries where each query covers a smaller slice of the 
journal instead of one
+  # covering the entire journal.
+  #
+  # Note that setting this too low will have a performance overhead in many 
queries being issued where
+  # each query returns no or very few entries, but what number is to low 
depends on how many tags are
+  # used and how well those are distributed, setting this value requires 
application specific benchmarking
+  # to find a good number.
+  #
+  # 0 means disable the factor and query the entire journal and limit to 
max-buffer-size elements
+  events-by-tag-buffer-sizes-per-query = 0
+
   # If enabled, automatically close the database connection when the actor 
system is terminated
   add-shutdown-hook = true
 
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
index 71b5ceb..288a4e0 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/config/PekkoPersistenceConfig.scala
@@ -196,6 +196,8 @@ class ReadJournalConfig(config: Config) {
   val pluginConfig = new ReadJournalPluginConfig(config)
   val refreshInterval: FiniteDuration = 
config.asFiniteDuration("refresh-interval")
   val maxBufferSize: Int = config.getInt("max-buffer-size")
+  val eventsByTagBufferSizesPerQuery: Long = 
config.getLong("events-by-tag-buffer-sizes-per-query")
+  require(eventsByTagBufferSizesPerQuery >= 0, 
"events-by-tag-buffer-sizes-per-query must not be negative")
   val addShutdownHook: Boolean = config.getBoolean("add-shutdown-hook")
 
   override def toString: String =
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
index 6b4a879..4a8c39a 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/query/scaladsl/JdbcReadJournal.scala
@@ -235,15 +235,31 @@ class JdbcReadJournal(config: Config, configPath: 
String)(implicit val system: E
     import FlowControl._
     implicit val askTimeout: Timeout = 
Timeout(readJournalConfig.journalSequenceRetrievalConfiguration.askTimeout)
     val batchSize = readJournalConfig.maxBufferSize
+    val maxOrderingRange = readJournalConfig.eventsByTagBufferSizesPerQuery 
match {
+      case 0 => None
+      case x => Some(x * batchSize)
+    }
+
+    def getLoopMaxOrderingId(offset: Long, latestOrdering: MaxOrderingId): 
MaxOrderingId =
+      maxOrderingRange match {
+        case None                 => latestOrdering
+        case Some(numberOfEvents) =>
+          val limitedMaxOrderingId = offset + numberOfEvents
+          if (limitedMaxOrderingId < 0 || limitedMaxOrderingId >= 
latestOrdering.maxOrdering) latestOrdering
+          else MaxOrderingId(limitedMaxOrderingId)
+      }
 
     Source
       .unfoldAsync[(Long, FlowControl), Seq[EventEnvelope]]((offset, 
Continue)) { case (from, control) =>
         def retrieveNextBatch() = {
           for {
             queryUntil <- 
journalSequenceActor.ask(GetMaxOrderingId).mapTo[MaxOrderingId]
-            xs <- currentJournalEventsByTag(tag, from, batchSize, 
queryUntil).runWith(Sink.seq)
+            loopMaxOrderingId = getLoopMaxOrderingId(from, queryUntil)
+            xs <- currentJournalEventsByTag(tag, from, batchSize, 
loopMaxOrderingId).runWith(Sink.seq)
           } yield {
-            val hasMoreEvents = xs.size == batchSize
+            // continue if query over entire journal was fewer than full batch 
or if we are limiting
+            // the query through eventsByTagBufferSizesPerQuery and didn't 
reach the last 'ordering' yet
+            val hasMoreEvents = (xs.size == batchSize) || 
(loopMaxOrderingId.maxOrdering < queryUntil.maxOrdering)
             val nextControl: FlowControl =
               terminateAfterOffset match {
                 // we may stop if target is behind queryUntil and we don't 
have more events to fetch
@@ -260,7 +276,7 @@ class JdbcReadJournal(config: Config, configPath: 
String)(implicit val system: E
               /* If no events matched the tag between `from` and `maxOrdering` 
then there is no need to execute the exact
                * same query again. We can continue querying from 
`maxOrdering`, which will save some load on the db.
                * (Note: we may never return a value smaller than `from`, 
otherwise we might return duplicate events) */
-              math.max(from, queryUntil.maxOrdering)
+              math.max(from, loopMaxOrderingId.maxOrdering)
             } else {
               // Continue querying from the largest offset
               xs.map(_.offset.value).max
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
new file mode 100644
index 0000000..6707b97
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/EventsByInfrequentTagTest.scala
@@ -0,0 +1,83 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * license agreements; and to You under the Apache License, version 2.0:
+ *
+ *   https://www.apache.org/licenses/LICENSE-2.0
+ *
+ * This file is part of the Apache Pekko project, which was derived from Akka.
+ */
+
+/*
+ * Copyright (C) 2014 - 2019 Dennis Vriend <https://github.com/dnvriend>
+ * Copyright (C) 2019 - 2022 Lightbend Inc. <https://www.lightbend.com>
+ */
+
+package org.apache.pekko.persistence.jdbc.query
+
+import org.apache.pekko
+import pekko.pattern.ask
+import pekko.persistence.jdbc.query.EventsByInfrequentTagTest._
+import pekko.persistence.query.{ EventEnvelope, NoOffset, Sequence }
+import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
+
+import scala.concurrent.duration._
+
+object EventsByInfrequentTagTest {
+  val maxBufferSize = 20
+  val refreshInterval = 500.milliseconds
+
+  val configOverrides: Map[String, ConfigValue] = Map(
+    "jdbc-read-journal.events-by-tag-buffer-sizes-per-query" -> 
ConfigValueFactory.fromAnyRef(1.toString),
+    "jdbc-read-journal.max-buffer-size" -> 
ConfigValueFactory.fromAnyRef(maxBufferSize.toString),
+    "jdbc-read-journal.refresh-interval" -> 
ConfigValueFactory.fromAnyRef(refreshInterval.toString()))
+}
+
+abstract class EventsByInfrequentTagTest(config: String) extends 
QueryTestSpec(config, configOverrides) {
+
+  final val NoMsgTime: FiniteDuration = 100.millis
+  it should "persist and find a tagged event with multiple (frequent and 
infrequent) tags" in withActorSystem {
+    implicit system =>
+      pendingIfOracleWithLegacy()
+
+      val journalOps = new ScalaJdbcReadJournalOperations(system)
+      withTestActors(replyToMessages = true) { (actor1, actor2, actor3) =>
+        val often = "often"
+        val notOften = "not-often"
+        withClue("Persisting multiple tagged events") {
+          (0 until 100).foreach { i =>
+            val additional = if (i % 40 == 0) {
+              Seq(notOften)
+            } else Seq.empty
+            val tags = Seq(often) ++ additional
+            (actor1 ? withTags(1, tags: _*)).futureValue
+          }
+
+          eventually {
+            journalOps.countJournal.futureValue shouldBe 100
+          }
+          journalOps.withEventsByTag()(often, NoOffset) { tp =>
+            tp.request(Int.MaxValue)
+            (0 until 100).foreach { i =>
+              tp.expectNextPF { case EventEnvelope(Sequence(i), _, _, _) => }
+            }
+            tp.cancel()
+            tp.expectNoMessage(NoMsgTime)
+          }
+
+          journalOps.withEventsByTag(10.seconds)(notOften, NoOffset) { tp =>
+            tp.request(Int.MaxValue)
+            tp.expectNextPF { case EventEnvelope(Sequence(1), _, _, _) => }
+            tp.expectNextPF { case EventEnvelope(Sequence(41), _, _, _) => }
+            tp.expectNextPF { case EventEnvelope(Sequence(81), _, _, _) => }
+
+            tp.cancel()
+            tp.expectNoMessage(NoMsgTime)
+          }
+        }
+
+      }
+  }
+
+}
+
+class H2ScalaEventsByInfrequentTagTest extends 
EventsByInfrequentTagTest("h2-shared-db-application.conf") with H2Cleaner


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to