This is an automated email from the ASF dual-hosted git repository.

roiocam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new ab2f6ce  perf: avoid large offset query via limit windowing (#180)
ab2f6ce is described below

commit ab2f6ce103ff7cb5de73bbebc58752330b499dba
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Mon May 27 09:11:03 2024 +0800

    perf: avoid large offset query via limit windowing (#180)
    
    * perf: avoid large offset query via limit windowing
    
    * add unit tests, simplify legacy
    
    * make legacy it works, new dao it
    
    * fix config
    
    * use single unit test
    
    * optimized it
    
    * fix
    
    * scala 2.12 build, oracle npe
    
    * fix scala 2.12 build, oracle npe
    
    * optimized imports
    
    * optimized import
---
 .../dao/BaseJournalDaoWithReadMessages.scala       |  23 ++-
 .../journal/dao/LimitWindowingStreamTest.scala     |  91 ++++++++++
 .../query/JournalDaoStreamMessagesMemoryTest.scala | 195 ++++++++++-----------
 .../persistence/jdbc/query/QueryTestSpec.scala     |  38 +++-
 4 files changed, 231 insertions(+), 116 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index dee3ba6..629ac0c 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -17,14 +17,14 @@ package org.apache.pekko.persistence.jdbc.journal.dao
 import org.apache.pekko
 import pekko.NotUsed
 import pekko.actor.Scheduler
+import pekko.annotation.InternalApi
 import pekko.persistence.PersistentRepr
 import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue, 
ContinueDelayed, Stop }
 import pekko.stream.Materializer
 import pekko.stream.scaladsl.{ Sink, Source }
 
-import scala.collection.immutable.Seq
-import scala.concurrent.{ ExecutionContext, Future }
 import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ ExecutionContext, Future }
 import scala.util.{ Failure, Success, Try }
 
 trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
@@ -38,13 +38,29 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
       toSequenceNr: Long,
       batchSize: Int,
       refreshInterval: Option[(FiniteDuration, Scheduler)]): 
Source[Try[(PersistentRepr, Long)], NotUsed] = {
+    internalBatchStream(persistenceId, fromSequenceNr, toSequenceNr, 
batchSize, refreshInterval).mapConcat(identity)
+  }
 
+  /**
+   * separate this method for unit tests.
+   */
+  @InternalApi
+  private[dao] def internalBatchStream(
+      persistenceId: String,
+      fromSequenceNr: Long,
+      toSequenceNr: Long,
+      batchSize: Int,
+      refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
     Source
       .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, 
Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
         case (from, control) =>
+          def limitWindow(from: Long): Long = {
+            math.min(from + batchSize, toSequenceNr)
+          }
+
           def retrieveNextBatch(): Future[Option[((Long, FlowControl), 
Seq[Try[(PersistentRepr, Long)]])]] = {
             for {
-              xs <- messages(persistenceId, from, toSequenceNr, 
batchSize).runWith(Sink.seq)
+              xs <- messages(persistenceId, from, limitWindow(from), 
batchSize).runWith(Sink.seq)
             } yield {
               val hasMoreEvents = xs.size == batchSize
               // Events are ordered by sequence number, therefore the last one 
is the largest)
@@ -77,7 +93,6 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
               pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
           }
       }
-      .mapConcat(identity(_))
   }
 
 }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
new file mode 100644
index 0000000..e9e8ccd
--- /dev/null
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
+import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
+import pekko.persistence.{ AtomicWrite, PersistentRepr }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
+import pekko.stream.{ Materializer, SystemMaterializer }
+import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.slf4j.LoggerFactory
+
+import java.util.UUID
+import scala.collection.immutable
+import scala.concurrent.duration._
+import scala.concurrent.{ Await, ExecutionContext, Future }
+
+object LimitWindowingStreamTest {
+  val fetchSize = 100
+  val configOverrides: Map[String, ConfigValue] =
+    Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef(fetchSize))
+}
+
+abstract class LimitWindowingStreamTest(configFile: String)
+    extends QueryTestSpec(configFile, 
LimitWindowingStreamTest.configOverrides) {
+
+  private val log = LoggerFactory.getLogger(this.getClass)
+
+  it should "stream events with limit windowing" in withActorSystem { implicit 
system =>
+    implicit val ec: ExecutionContext = system.dispatcher
+    implicit val mat: Materializer = SystemMaterializer(system).materializer
+
+    val persistenceId = UUID.randomUUID().toString
+    val payload = 'a'.toByte
+    val eventsPerBatch = 1000
+    val numberOfInsertBatches = 16
+    val totalMessages = numberOfInsertBatches * eventsPerBatch
+
+    withDao { dao =>
+      val lastInsert =
+        Source
+          .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
+          .mapAsync(1) { i =>
+            val end = i * eventsPerBatch
+            val start = end - (eventsPerBatch - 1)
+            log.info(s"batch $i (events from $start to $end")
+            val atomicWrites =
+              (start to end).map { j =>
+                AtomicWrite(immutable.Seq(PersistentRepr(payload, j, 
persistenceId)))
+              }
+            dao.asyncWriteMessages(atomicWrites).map(_ => i)
+          }
+          .runWith(Sink.last)
+
+      lastInsert.futureValue(Timeout(totalMessages.seconds))
+      val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
+      val messagesSrc =
+        readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages, 
batchSize = fetchSize, None)
+
+      val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0, 
0)) { case ((accBatch, accTotal), seq) =>
+        (accBatch + 1, accTotal + seq.size)
+      })(Keep.right).run()
+
+      val (batchCount, totalCount) = Await.result(eventualSum, Duration.Inf)
+      val totalBatch = totalMessages / fetchSize
+      batchCount shouldBe totalBatch
+      totalCount shouldBe totalMessages
+    }
+  }
+}
+
+class H2LimitWindowingStreamTest extends 
LimitWindowingStreamTest("h2-application.conf") with H2Cleaner
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
index 02625e7..216d7b8 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
@@ -14,16 +14,15 @@
 
 package org.apache.pekko.persistence.jdbc.query
 
-import java.lang.management.ManagementFactory
-import java.lang.management.MemoryMXBean
+import java.lang.management.{ ManagementFactory, MemoryMXBean }
 import java.util.UUID
 
 import org.apache.pekko
-import pekko.actor.ActorSystem
+import 
pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
 import pekko.persistence.{ AtomicWrite, PersistentRepr }
-import pekko.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao, 
JournalTables }
-import pekko.serialization.SerializationExtension
 import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.{ Materializer, SystemMaterializer }
 import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
 import org.scalatest.concurrent.PatienceConfiguration.Timeout
 import org.slf4j.LoggerFactory
@@ -32,120 +31,110 @@ import scala.collection.immutable
 import scala.concurrent.ExecutionContext
 import scala.concurrent.duration._
 import scala.util.{ Failure, Success }
-import pekko.stream.testkit.scaladsl.TestSink
-import org.scalatest.matchers.should.Matchers
 
 object JournalDaoStreamMessagesMemoryTest {
 
-  val configOverrides: Map[String, ConfigValue] = 
Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
+  val fetchSize: Int = 100
+  val MB: Int = 1024 * 1024
 
-  val MB = 1024 * 1024
+  val configOverrides: Map[String, ConfigValue] = Map(
+    "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
 }
 
 abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
-    extends QueryTestSpec(configFile, 
JournalDaoStreamMessagesMemoryTest.configOverrides)
-    with JournalTables
-    with Matchers {
+    extends QueryTestSpec(configFile, 
JournalDaoStreamMessagesMemoryTest.configOverrides) {
+
   import JournalDaoStreamMessagesMemoryTest.MB
 
   private val log = LoggerFactory.getLogger(this.getClass)
 
-  val journalSequenceActorConfig = 
readJournalConfig.journalSequenceRetrievalConfiguration
-  val journalTableCfg = journalConfig.journalTableConfiguration
+  val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
 
-  implicit val askTimeout: FiniteDuration = 50.millis
+  it should "stream events" in withActorSystem { implicit system =>
+    implicit val ec: ExecutionContext = system.dispatcher
+    implicit val mat: Materializer = SystemMaterializer(system).materializer
 
-  def generateId: Int = 0
+    withDao { dao =>
+      val persistenceId = UUID.randomUUID().toString
 
-  val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
+      val writerUuid = UUID.randomUUID().toString
+
+      val payloadSize = 5000 // 5000 bytes
+      val eventsPerBatch = 1000
 
-  behavior.of("Replaying Persistence Actor")
-
-  it should "stream events" in {
-    if (newDao)
-      pending
-    withActorSystem { implicit system: ActorSystem =>
-      withDatabase { db =>
-        implicit val ec: ExecutionContext = system.dispatcher
-
-        val persistenceId = UUID.randomUUID().toString
-        val dao = new ByteArrayJournalDao(db, profile, journalConfig, 
SerializationExtension(system))
-
-        val payloadSize = 5000 // 5000 bytes
-        val eventsPerBatch = 1000
-
-        val maxMem = 64 * MB
-
-        val numberOfInsertBatches = {
-          // calculate the number of batches using a factor to make sure we go 
a little bit over the limit
-          (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
-        }
-        val totalMessages = numberOfInsertBatches * eventsPerBatch
-        val totalMessagePayload = totalMessages * payloadSize
-        log.info(
-          s"batches: $numberOfInsertBatches (with $eventsPerBatch events), 
total messages: $totalMessages, total msgs size: $totalMessagePayload")
-
-        // payload can be the same when inserting to avoid unnecessary memory 
usage
-        val payload = Array.fill(payloadSize)('a'.toByte)
-
-        val lastInsert =
-          Source
-            .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
-            .mapAsync(1) { i =>
-              val end = i * eventsPerBatch
-              val start = end - (eventsPerBatch - 1)
-              log.info(s"batch $i - events from $start to $end")
-              val atomicWrites =
-                (start to end).map { j =>
-                  AtomicWrite(immutable.Seq(PersistentRepr(payload, j, 
persistenceId)))
-                }.toSeq
-
-              dao.asyncWriteMessages(atomicWrites).map(_ => i)
-            }
-            .runWith(Sink.last)
-
-        // wait until we write all messages
-        // being very generous, 1 second per message
-        lastInsert.futureValue(Timeout(totalMessages.seconds))
-
-        log.info("Events written, starting replay")
-
-        // sleep and gc to have some kind of stable measurement of current 
heap usage
-        Thread.sleep(1000)
-        System.gc()
-        Thread.sleep(1000)
-        val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed
-
-        val messagesSrc =
-          dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = 
100, None)
-        val probe =
-          messagesSrc
-            .map {
-              case Success((repr, _)) =>
-                if (repr.sequenceNr % 100 == 0)
-                  log.info(s"fetched: ${repr.persistenceId} - 
${repr.sequenceNr}/$totalMessages")
-              case Failure(exception) =>
-                log.error("Failure when reading messages.", exception)
-            }
-            .runWith(TestSink.probe)
-
-        probe.request(10)
-        probe.within(20.seconds) {
-          probe.expectNextN(10)
-        }
-
-        // sleep and gc to have some kind of stable measurement of current 
heap usage
-        Thread.sleep(2000)
-        System.gc()
-        Thread.sleep(1000)
-        val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed
-
-        log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / 
MB} MB")
-        // actual usage is much less than 10 MB
-        (usedAfter - usedBefore) should be <= (10L * MB)
-
-        probe.cancel()
+      val maxMem = 64 * MB
+
+      val numberOfInsertBatches = {
+        // calculate the number of batches using a factor to make sure we go a 
little bit over the limit
+        (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
+      }
+      val totalMessages = numberOfInsertBatches * eventsPerBatch
+      val totalMessagePayload = totalMessages * payloadSize
+      log.info(
+        s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total 
messages: $totalMessages, total msgs size: $totalMessagePayload")
+
+      // payload can be the same when inserting to avoid unnecessary memory 
usage
+      val payload = Array.fill(payloadSize)('a'.toByte)
+
+      val lastInsert =
+        Source
+          .fromIterator(() => (1 to numberOfInsertBatches).iterator)
+          .mapAsync(1) { i =>
+            val end = i * eventsPerBatch
+            val start = end - (eventsPerBatch - 1)
+            log.info(s"batch $i - events from $start to $end")
+            val atomicWrites =
+              (start to end).map { j =>
+                AtomicWrite(immutable.Seq(PersistentRepr(payload, j, 
persistenceId, writerUuid = writerUuid)))
+              }
+            dao.asyncWriteMessages(atomicWrites).map(_ => i)
+          }
+          .runWith(Sink.last)
+
+      // wait until we write all messages
+      // being very generous, 1 second per message
+      lastInsert.futureValue(Timeout(totalMessages.seconds))
+
+      log.info("Events written, starting replay")
+
+      // sleep and gc to have some kind of stable measurement of current heap 
usage
+      Thread.sleep(1000)
+      System.gc()
+      Thread.sleep(1000)
+      val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed
+
+      val messagesSrc =
+        dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize = 
fetchSize, None)
+      val probe =
+        messagesSrc
+          .map {
+            case Success((repr, _)) =>
+              if (repr.sequenceNr % 100 == 0)
+                log.info(s"fetched: ${repr.persistenceId} - 
${repr.sequenceNr}/$totalMessages")
+            case Failure(exception) =>
+              log.error("Failure when reading messages.", exception)
+          }
+          .runWith(TestSink.probe)
+
+      probe.request(10)
+      probe.within(20.seconds) {
+        probe.expectNextN(10)
       }
+
+      // sleep and gc to have some kind of stable measurement of current heap 
usage
+      Thread.sleep(2000)
+      System.gc()
+      Thread.sleep(1000)
+      val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed
+
+      log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter / 
MB} MB")
+      // actual usage is much less than 10 MB
+      (usedAfter - usedBefore) should be <= (10L * MB)
+
+      probe.cancel()
     }
   }
 }
+
+class H2JournalDaoStreamMessagesMemoryTest extends 
JournalDaoStreamMessagesMemoryTest("h2-application.conf")
+    with H2Cleaner
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
index 9c4290c..c299fbe 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
@@ -15,30 +15,33 @@
 package org.apache.pekko.persistence.jdbc.query
 
 import org.apache.pekko
-import pekko.actor.{ ActorRef, ActorSystem, Props, Stash, Status }
-import pekko.pattern.ask
+import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash, 
Status }
 import pekko.event.LoggingReceive
-import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, 
PersistentActor }
+import pekko.pattern.ask
 import pekko.persistence.jdbc.SingleActorSystemPerTestSpec
+import pekko.persistence.jdbc.config.JournalConfig
+import pekko.persistence.jdbc.journal.dao.JournalDao
 import pekko.persistence.jdbc.query.EventAdapterTest.{ Event, 
TaggedAsyncEvent, TaggedEvent }
 import pekko.persistence.jdbc.query.javadsl.{ JdbcReadJournal => 
JavaJdbcReadJournal }
 import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal
+import pekko.persistence.jdbc.testkit.internal._
 import pekko.persistence.journal.Tagged
 import pekko.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
+import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess, 
PersistentActor }
+import pekko.serialization.{ Serialization, SerializationExtension }
 import pekko.stream.scaladsl.Sink
 import pekko.stream.testkit.TestSubscriber
 import pekko.stream.testkit.javadsl.{ TestSink => JavaSink }
 import pekko.stream.testkit.scaladsl.TestSink
 import pekko.stream.{ Materializer, SystemMaterializer }
 import com.typesafe.config.ConfigValue
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
 
-import scala.concurrent.{ ExecutionContext, Future }
+import scala.collection.immutable
 import scala.concurrent.duration.{ FiniteDuration, _ }
-import pekko.persistence.jdbc.testkit.internal.H2
-import pekko.persistence.jdbc.testkit.internal.MySQL
-import pekko.persistence.jdbc.testkit.internal.Oracle
-import pekko.persistence.jdbc.testkit.internal.Postgres
-import pekko.persistence.jdbc.testkit.internal.SqlServer
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success }
 
 trait ReadJournalOperations {
   def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f: 
TestSubscriber.Probe[String] => Unit): Unit
@@ -337,6 +340,23 @@ abstract class QueryTestSpec(config: String, 
configOverrides: Map[String, Config
 
   def withTags(payload: Any, tags: String*) = Tagged(payload, Set(tags: _*))
 
+  def withDao(f: JournalDao => Unit)(implicit system: ActorSystem, ec: 
ExecutionContext, mat: Materializer): Unit = {
+    val fqcn: String = journalConfig.pluginConfig.dao
+    val args: immutable.Seq[(Class[_], AnyRef)] = immutable.Seq(
+      (classOf[Database], db),
+      (classOf[JdbcProfile], profile),
+      (classOf[JournalConfig], journalConfig),
+      (classOf[Serialization], SerializationExtension(system)),
+      (classOf[ExecutionContext], ec),
+      (classOf[Materializer], mat))
+    val journalDao: JournalDao =
+      
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn,
 args) match {
+        case Success(dao)   => dao
+        case Failure(cause) => throw cause
+      }
+    f(journalDao)
+  }
+
 }
 
 trait PostgresCleaner extends QueryTestSpec {


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to