This is an automated email from the ASF dual-hosted git repository.
roiocam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new ab2f6ce perf: avoid large offset query via limit windowing (#180)
ab2f6ce is described below
commit ab2f6ce103ff7cb5de73bbebc58752330b499dba
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Mon May 27 09:11:03 2024 +0800
perf: avoid large offset query via limit windowing (#180)
* perf: avoid large offset query via limit windowing
* add unit tests, simplify legacy
* make legacy it works, new dao it
* fix config
* use single unit test
* optimized it
* fix
* scala 2.12 build, oracle npe
* fix scala 2.12 build, oracle npe
* optimized imports
* optimized import
---
.../dao/BaseJournalDaoWithReadMessages.scala | 23 ++-
.../journal/dao/LimitWindowingStreamTest.scala | 91 ++++++++++
.../query/JournalDaoStreamMessagesMemoryTest.scala | 195 ++++++++++-----------
.../persistence/jdbc/query/QueryTestSpec.scala | 38 +++-
4 files changed, 231 insertions(+), 116 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index dee3ba6..629ac0c 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -17,14 +17,14 @@ package org.apache.pekko.persistence.jdbc.journal.dao
import org.apache.pekko
import pekko.NotUsed
import pekko.actor.Scheduler
+import pekko.annotation.InternalApi
import pekko.persistence.PersistentRepr
import pekko.persistence.jdbc.journal.dao.FlowControl.{ Continue,
ContinueDelayed, Stop }
import pekko.stream.Materializer
import pekko.stream.scaladsl.{ Sink, Source }
-import scala.collection.immutable.Seq
-import scala.concurrent.{ ExecutionContext, Future }
import scala.concurrent.duration.FiniteDuration
+import scala.concurrent.{ ExecutionContext, Future }
import scala.util.{ Failure, Success, Try }
trait BaseJournalDaoWithReadMessages extends JournalDaoWithReadMessages {
@@ -38,13 +38,29 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]):
Source[Try[(PersistentRepr, Long)], NotUsed] = {
+ internalBatchStream(persistenceId, fromSequenceNr, toSequenceNr,
batchSize, refreshInterval).mapConcat(identity)
+ }
+ /**
+ * separate this method for unit tests.
+ */
+ @InternalApi
+ private[dao] def internalBatchStream(
+ persistenceId: String,
+ fromSequenceNr: Long,
+ toSequenceNr: Long,
+ batchSize: Int,
+ refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
Source
.unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr,
Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
case (from, control) =>
+ def limitWindow(from: Long): Long = {
+ math.min(from + batchSize, toSequenceNr)
+ }
+
def retrieveNextBatch(): Future[Option[((Long, FlowControl),
Seq[Try[(PersistentRepr, Long)]])]] = {
for {
- xs <- messages(persistenceId, from, toSequenceNr,
batchSize).runWith(Sink.seq)
+ xs <- messages(persistenceId, from, limitWindow(from),
batchSize).runWith(Sink.seq)
} yield {
val hasMoreEvents = xs.size == batchSize
// Events are ordered by sequence number, therefore the last one
is the largest)
@@ -77,7 +93,6 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
pekko.pattern.after(delay, scheduler)(retrieveNextBatch())
}
}
- .mapConcat(identity(_))
}
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
new file mode 100644
index 0000000..e9e8ccd
--- /dev/null
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.journal.dao
+
+import org.apache.pekko
+import pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest.fetchSize
+import pekko.persistence.jdbc.query.{ H2Cleaner, QueryTestSpec }
+import pekko.persistence.{ AtomicWrite, PersistentRepr }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
+import pekko.stream.{ Materializer, SystemMaterializer }
+import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
+import org.scalatest.concurrent.PatienceConfiguration.Timeout
+import org.slf4j.LoggerFactory
+
+import java.util.UUID
+import scala.collection.immutable
+import scala.concurrent.duration._
+import scala.concurrent.{ Await, ExecutionContext, Future }
+
+object LimitWindowingStreamTest {
+ val fetchSize = 100
+ val configOverrides: Map[String, ConfigValue] =
+ Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef(fetchSize))
+}
+
+abstract class LimitWindowingStreamTest(configFile: String)
+ extends QueryTestSpec(configFile,
LimitWindowingStreamTest.configOverrides) {
+
+ private val log = LoggerFactory.getLogger(this.getClass)
+
+ it should "stream events with limit windowing" in withActorSystem { implicit
system =>
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val mat: Materializer = SystemMaterializer(system).materializer
+
+ val persistenceId = UUID.randomUUID().toString
+ val payload = 'a'.toByte
+ val eventsPerBatch = 1000
+ val numberOfInsertBatches = 16
+ val totalMessages = numberOfInsertBatches * eventsPerBatch
+
+ withDao { dao =>
+ val lastInsert =
+ Source
+ .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
+ .mapAsync(1) { i =>
+ val end = i * eventsPerBatch
+ val start = end - (eventsPerBatch - 1)
+ log.info(s"batch $i (events from $start to $end")
+ val atomicWrites =
+ (start to end).map { j =>
+ AtomicWrite(immutable.Seq(PersistentRepr(payload, j,
persistenceId)))
+ }
+ dao.asyncWriteMessages(atomicWrites).map(_ => i)
+ }
+ .runWith(Sink.last)
+
+ lastInsert.futureValue(Timeout(totalMessages.seconds))
+ val readMessagesDao = dao.asInstanceOf[BaseJournalDaoWithReadMessages]
+ val messagesSrc =
+ readMessagesDao.internalBatchStream(persistenceId, 0, totalMessages,
batchSize = fetchSize, None)
+
+ val eventualSum: Future[(Int, Int)] = messagesSrc.toMat(Sink.fold((0,
0)) { case ((accBatch, accTotal), seq) =>
+ (accBatch + 1, accTotal + seq.size)
+ })(Keep.right).run()
+
+ val (batchCount, totalCount) = Await.result(eventualSum, Duration.Inf)
+ val totalBatch = totalMessages / fetchSize
+ batchCount shouldBe totalBatch
+ totalCount shouldBe totalMessages
+ }
+ }
+}
+
+class H2LimitWindowingStreamTest extends
LimitWindowingStreamTest("h2-application.conf") with H2Cleaner
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
index 02625e7..216d7b8 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/JournalDaoStreamMessagesMemoryTest.scala
@@ -14,16 +14,15 @@
package org.apache.pekko.persistence.jdbc.query
-import java.lang.management.ManagementFactory
-import java.lang.management.MemoryMXBean
+import java.lang.management.{ ManagementFactory, MemoryMXBean }
import java.util.UUID
import org.apache.pekko
-import pekko.actor.ActorSystem
+import
pekko.persistence.jdbc.query.JournalDaoStreamMessagesMemoryTest.fetchSize
import pekko.persistence.{ AtomicWrite, PersistentRepr }
-import pekko.persistence.jdbc.journal.dao.legacy.{ ByteArrayJournalDao,
JournalTables }
-import pekko.serialization.SerializationExtension
import pekko.stream.scaladsl.{ Sink, Source }
+import pekko.stream.testkit.scaladsl.TestSink
+import pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.{ ConfigValue, ConfigValueFactory }
import org.scalatest.concurrent.PatienceConfiguration.Timeout
import org.slf4j.LoggerFactory
@@ -32,120 +31,110 @@ import scala.collection.immutable
import scala.concurrent.ExecutionContext
import scala.concurrent.duration._
import scala.util.{ Failure, Success }
-import pekko.stream.testkit.scaladsl.TestSink
-import org.scalatest.matchers.should.Matchers
object JournalDaoStreamMessagesMemoryTest {
- val configOverrides: Map[String, ConfigValue] =
Map("jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
+ val fetchSize: Int = 100
+ val MB: Int = 1024 * 1024
- val MB = 1024 * 1024
+ val configOverrides: Map[String, ConfigValue] = Map(
+ "jdbc-journal.fetch-size" -> ConfigValueFactory.fromAnyRef("100"))
}
abstract class JournalDaoStreamMessagesMemoryTest(configFile: String)
- extends QueryTestSpec(configFile,
JournalDaoStreamMessagesMemoryTest.configOverrides)
- with JournalTables
- with Matchers {
+ extends QueryTestSpec(configFile,
JournalDaoStreamMessagesMemoryTest.configOverrides) {
+
import JournalDaoStreamMessagesMemoryTest.MB
private val log = LoggerFactory.getLogger(this.getClass)
- val journalSequenceActorConfig =
readJournalConfig.journalSequenceRetrievalConfiguration
- val journalTableCfg = journalConfig.journalTableConfiguration
+ val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
- implicit val askTimeout: FiniteDuration = 50.millis
+ it should "stream events" in withActorSystem { implicit system =>
+ implicit val ec: ExecutionContext = system.dispatcher
+ implicit val mat: Materializer = SystemMaterializer(system).materializer
- def generateId: Int = 0
+ withDao { dao =>
+ val persistenceId = UUID.randomUUID().toString
- val memoryMBean: MemoryMXBean = ManagementFactory.getMemoryMXBean
+ val writerUuid = UUID.randomUUID().toString
+
+ val payloadSize = 5000 // 5000 bytes
+ val eventsPerBatch = 1000
- behavior.of("Replaying Persistence Actor")
-
- it should "stream events" in {
- if (newDao)
- pending
- withActorSystem { implicit system: ActorSystem =>
- withDatabase { db =>
- implicit val ec: ExecutionContext = system.dispatcher
-
- val persistenceId = UUID.randomUUID().toString
- val dao = new ByteArrayJournalDao(db, profile, journalConfig,
SerializationExtension(system))
-
- val payloadSize = 5000 // 5000 bytes
- val eventsPerBatch = 1000
-
- val maxMem = 64 * MB
-
- val numberOfInsertBatches = {
- // calculate the number of batches using a factor to make sure we go
a little bit over the limit
- (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
- }
- val totalMessages = numberOfInsertBatches * eventsPerBatch
- val totalMessagePayload = totalMessages * payloadSize
- log.info(
- s"batches: $numberOfInsertBatches (with $eventsPerBatch events),
total messages: $totalMessages, total msgs size: $totalMessagePayload")
-
- // payload can be the same when inserting to avoid unnecessary memory
usage
- val payload = Array.fill(payloadSize)('a'.toByte)
-
- val lastInsert =
- Source
- .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
- .mapAsync(1) { i =>
- val end = i * eventsPerBatch
- val start = end - (eventsPerBatch - 1)
- log.info(s"batch $i - events from $start to $end")
- val atomicWrites =
- (start to end).map { j =>
- AtomicWrite(immutable.Seq(PersistentRepr(payload, j,
persistenceId)))
- }.toSeq
-
- dao.asyncWriteMessages(atomicWrites).map(_ => i)
- }
- .runWith(Sink.last)
-
- // wait until we write all messages
- // being very generous, 1 second per message
- lastInsert.futureValue(Timeout(totalMessages.seconds))
-
- log.info("Events written, starting replay")
-
- // sleep and gc to have some kind of stable measurement of current
heap usage
- Thread.sleep(1000)
- System.gc()
- Thread.sleep(1000)
- val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed
-
- val messagesSrc =
- dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize =
100, None)
- val probe =
- messagesSrc
- .map {
- case Success((repr, _)) =>
- if (repr.sequenceNr % 100 == 0)
- log.info(s"fetched: ${repr.persistenceId} -
${repr.sequenceNr}/$totalMessages")
- case Failure(exception) =>
- log.error("Failure when reading messages.", exception)
- }
- .runWith(TestSink.probe)
-
- probe.request(10)
- probe.within(20.seconds) {
- probe.expectNextN(10)
- }
-
- // sleep and gc to have some kind of stable measurement of current
heap usage
- Thread.sleep(2000)
- System.gc()
- Thread.sleep(1000)
- val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed
-
- log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter /
MB} MB")
- // actual usage is much less than 10 MB
- (usedAfter - usedBefore) should be <= (10L * MB)
-
- probe.cancel()
+ val maxMem = 64 * MB
+
+ val numberOfInsertBatches = {
+ // calculate the number of batches using a factor to make sure we go a
little bit over the limit
+ (maxMem / (payloadSize * eventsPerBatch) * 1.2).round.toInt
+ }
+ val totalMessages = numberOfInsertBatches * eventsPerBatch
+ val totalMessagePayload = totalMessages * payloadSize
+ log.info(
+ s"batches: $numberOfInsertBatches (with $eventsPerBatch events), total
messages: $totalMessages, total msgs size: $totalMessagePayload")
+
+ // payload can be the same when inserting to avoid unnecessary memory
usage
+ val payload = Array.fill(payloadSize)('a'.toByte)
+
+ val lastInsert =
+ Source
+ .fromIterator(() => (1 to numberOfInsertBatches).iterator)
+ .mapAsync(1) { i =>
+ val end = i * eventsPerBatch
+ val start = end - (eventsPerBatch - 1)
+ log.info(s"batch $i - events from $start to $end")
+ val atomicWrites =
+ (start to end).map { j =>
+ AtomicWrite(immutable.Seq(PersistentRepr(payload, j,
persistenceId, writerUuid = writerUuid)))
+ }
+ dao.asyncWriteMessages(atomicWrites).map(_ => i)
+ }
+ .runWith(Sink.last)
+
+ // wait until we write all messages
+ // being very generous, 1 second per message
+ lastInsert.futureValue(Timeout(totalMessages.seconds))
+
+ log.info("Events written, starting replay")
+
+ // sleep and gc to have some kind of stable measurement of current heap
usage
+ Thread.sleep(1000)
+ System.gc()
+ Thread.sleep(1000)
+ val usedBefore = memoryMBean.getHeapMemoryUsage.getUsed
+
+ val messagesSrc =
+ dao.messagesWithBatch(persistenceId, 0, totalMessages, batchSize =
fetchSize, None)
+ val probe =
+ messagesSrc
+ .map {
+ case Success((repr, _)) =>
+ if (repr.sequenceNr % 100 == 0)
+ log.info(s"fetched: ${repr.persistenceId} -
${repr.sequenceNr}/$totalMessages")
+ case Failure(exception) =>
+ log.error("Failure when reading messages.", exception)
+ }
+ .runWith(TestSink.probe)
+
+ probe.request(10)
+ probe.within(20.seconds) {
+ probe.expectNextN(10)
}
+
+ // sleep and gc to have some kind of stable measurement of current heap
usage
+ Thread.sleep(2000)
+ System.gc()
+ Thread.sleep(1000)
+ val usedAfter = memoryMBean.getHeapMemoryUsage.getUsed
+
+ log.info(s"Used heap before ${usedBefore / MB} MB, after ${usedAfter /
MB} MB")
+ // actual usage is much less than 10 MB
+ (usedAfter - usedBefore) should be <= (10L * MB)
+
+ probe.cancel()
}
}
}
+
+class H2JournalDaoStreamMessagesMemoryTest extends
JournalDaoStreamMessagesMemoryTest("h2-application.conf")
+ with H2Cleaner
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
index 9c4290c..c299fbe 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/QueryTestSpec.scala
@@ -15,30 +15,33 @@
package org.apache.pekko.persistence.jdbc.query
import org.apache.pekko
-import pekko.actor.{ ActorRef, ActorSystem, Props, Stash, Status }
-import pekko.pattern.ask
+import pekko.actor.{ ActorRef, ActorSystem, ExtendedActorSystem, Props, Stash,
Status }
import pekko.event.LoggingReceive
-import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess,
PersistentActor }
+import pekko.pattern.ask
import pekko.persistence.jdbc.SingleActorSystemPerTestSpec
+import pekko.persistence.jdbc.config.JournalConfig
+import pekko.persistence.jdbc.journal.dao.JournalDao
import pekko.persistence.jdbc.query.EventAdapterTest.{ Event,
TaggedAsyncEvent, TaggedEvent }
import pekko.persistence.jdbc.query.javadsl.{ JdbcReadJournal =>
JavaJdbcReadJournal }
import pekko.persistence.jdbc.query.scaladsl.JdbcReadJournal
+import pekko.persistence.jdbc.testkit.internal._
import pekko.persistence.journal.Tagged
import pekko.persistence.query.{ EventEnvelope, Offset, PersistenceQuery }
+import pekko.persistence.{ DeleteMessagesFailure, DeleteMessagesSuccess,
PersistentActor }
+import pekko.serialization.{ Serialization, SerializationExtension }
import pekko.stream.scaladsl.Sink
import pekko.stream.testkit.TestSubscriber
import pekko.stream.testkit.javadsl.{ TestSink => JavaSink }
import pekko.stream.testkit.scaladsl.TestSink
import pekko.stream.{ Materializer, SystemMaterializer }
import com.typesafe.config.ConfigValue
+import slick.jdbc.JdbcBackend.Database
+import slick.jdbc.JdbcProfile
-import scala.concurrent.{ ExecutionContext, Future }
+import scala.collection.immutable
import scala.concurrent.duration.{ FiniteDuration, _ }
-import pekko.persistence.jdbc.testkit.internal.H2
-import pekko.persistence.jdbc.testkit.internal.MySQL
-import pekko.persistence.jdbc.testkit.internal.Oracle
-import pekko.persistence.jdbc.testkit.internal.Postgres
-import pekko.persistence.jdbc.testkit.internal.SqlServer
+import scala.concurrent.{ ExecutionContext, Future }
+import scala.util.{ Failure, Success }
trait ReadJournalOperations {
def withCurrentPersistenceIds(within: FiniteDuration = 60.second)(f:
TestSubscriber.Probe[String] => Unit): Unit
@@ -337,6 +340,23 @@ abstract class QueryTestSpec(config: String,
configOverrides: Map[String, Config
def withTags(payload: Any, tags: String*) = Tagged(payload, Set(tags: _*))
+ def withDao(f: JournalDao => Unit)(implicit system: ActorSystem, ec:
ExecutionContext, mat: Materializer): Unit = {
+ val fqcn: String = journalConfig.pluginConfig.dao
+ val args: immutable.Seq[(Class[_], AnyRef)] = immutable.Seq(
+ (classOf[Database], db),
+ (classOf[JdbcProfile], profile),
+ (classOf[JournalConfig], journalConfig),
+ (classOf[Serialization], SerializationExtension(system)),
+ (classOf[ExecutionContext], ec),
+ (classOf[Materializer], mat))
+ val journalDao: JournalDao =
+
system.asInstanceOf[ExtendedActorSystem].dynamicAccess.createInstanceFor[JournalDao](fqcn,
args) match {
+ case Success(dao) => dao
+ case Failure(cause) => throw cause
+ }
+ f(journalDao)
+ }
+
}
trait PostgresCleaner extends QueryTestSpec {
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]