This is an automated email from the ASF dual-hosted git repository. hepin pushed a commit to branch main in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
commit a1ace88d66e297238045213af3b646d768992dd3 Author: JingZhang Chen <[email protected]> AuthorDate: Fri Apr 12 10:15:29 2024 +0800 use new Source.queue to avoid memory leak --- .../mima-filters/1.1.x.backwards.excludes/BaseDao.excludes | 1 + .../apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala | 10 +++++----- 2 files changed, 6 insertions(+), 5 deletions(-) diff --git a/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes b/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes new file mode 100644 index 0000000..76a9065 --- /dev/null +++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes @@ -0,0 +1 @@ +ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.jdbc.journal.dao.BaseDao.writeQueue") diff --git a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala index 333b2c5..9b58cb6 100644 --- a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala +++ b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala @@ -16,8 +16,8 @@ package org.apache.pekko.persistence.jdbc.journal.dao import org.apache.pekko import pekko.persistence.jdbc.config.BaseDaoConfig -import pekko.stream.scaladsl.{ Keep, Sink, Source, SourceQueueWithComplete } -import pekko.stream.{ Materializer, OverflowStrategy, QueueOfferResult } +import pekko.stream.scaladsl.{ Keep, Sink, Source } +import pekko.stream.{ BoundedSourceQueue, Materializer, QueueOfferResult } import scala.collection.immutable.{ Seq, Vector } import scala.concurrent.{ ExecutionContext, Future, Promise } @@ -29,8 +29,8 @@ abstract class BaseDao[T] { def baseDaoConfig: BaseDaoConfig - val writeQueue: SourceQueueWithComplete[(Promise[Unit], Seq[T])] = Source - .queue[(Promise[Unit], Seq[T])](baseDaoConfig.bufferSize, OverflowStrategy.dropNew) + val writeQueue: BoundedSourceQueue[(Promise[Unit], Seq[T])] = Source + .queue[(Promise[Unit], Seq[T])](baseDaoConfig.bufferSize) .batchWeighted[(Seq[Promise[Unit]], Seq[T])](baseDaoConfig.batchSize, _._2.size, tup => Vector(tup._1) -> tup._2) { case ((promises, rows), (newPromise, newRows)) => (promises :+ newPromise) -> (rows ++ newRows) } @@ -46,7 +46,7 @@ abstract class BaseDao[T] { def queueWriteJournalRows(xs: Seq[T]): Future[Unit] = { val promise = Promise[Unit]() - writeQueue.offer(promise -> xs).flatMap { + writeQueue.offer(promise -> xs) match { case QueueOfferResult.Enqueued => promise.future case QueueOfferResult.Failure(t) => --------------------------------------------------------------------- To unsubscribe, e-mail: [email protected] For additional commands, e-mail: [email protected]
