This is an automated email from the ASF dual-hosted git repository.

hepin pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git

commit a1ace88d66e297238045213af3b646d768992dd3
Author: JingZhang Chen <[email protected]>
AuthorDate: Fri Apr 12 10:15:29 2024 +0800

    use new Source.queue to avoid memory leak
---
 .../mima-filters/1.1.x.backwards.excludes/BaseDao.excludes     |  1 +
 .../apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala    | 10 +++++-----
 2 files changed, 6 insertions(+), 5 deletions(-)

diff --git 
a/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes 
b/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes
new file mode 100644
index 0000000..76a9065
--- /dev/null
+++ b/core/src/main/mima-filters/1.1.x.backwards.excludes/BaseDao.excludes
@@ -0,0 +1 @@
+ProblemFilters.exclude[IncompatibleResultTypeProblem]("org.apache.pekko.persistence.jdbc.journal.dao.BaseDao.writeQueue")
diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala
index 333b2c5..9b58cb6 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseDao.scala
@@ -16,8 +16,8 @@ package org.apache.pekko.persistence.jdbc.journal.dao
 
 import org.apache.pekko
 import pekko.persistence.jdbc.config.BaseDaoConfig
-import pekko.stream.scaladsl.{ Keep, Sink, Source, SourceQueueWithComplete }
-import pekko.stream.{ Materializer, OverflowStrategy, QueueOfferResult }
+import pekko.stream.scaladsl.{ Keep, Sink, Source }
+import pekko.stream.{ BoundedSourceQueue, Materializer, QueueOfferResult }
 
 import scala.collection.immutable.{ Seq, Vector }
 import scala.concurrent.{ ExecutionContext, Future, Promise }
@@ -29,8 +29,8 @@ abstract class BaseDao[T] {
 
   def baseDaoConfig: BaseDaoConfig
 
-  val writeQueue: SourceQueueWithComplete[(Promise[Unit], Seq[T])] = Source
-    .queue[(Promise[Unit], Seq[T])](baseDaoConfig.bufferSize, 
OverflowStrategy.dropNew)
+  val writeQueue: BoundedSourceQueue[(Promise[Unit], Seq[T])] = Source
+    .queue[(Promise[Unit], Seq[T])](baseDaoConfig.bufferSize)
     .batchWeighted[(Seq[Promise[Unit]], Seq[T])](baseDaoConfig.batchSize, 
_._2.size, tup => Vector(tup._1) -> tup._2) {
       case ((promises, rows), (newPromise, newRows)) => (promises :+ 
newPromise) -> (rows ++ newRows)
     }
@@ -46,7 +46,7 @@ abstract class BaseDao[T] {
 
   def queueWriteJournalRows(xs: Seq[T]): Future[Unit] = {
     val promise = Promise[Unit]()
-    writeQueue.offer(promise -> xs).flatMap {
+    writeQueue.offer(promise -> xs) match {
       case QueueOfferResult.Enqueued =>
         promise.future
       case QueueOfferResult.Failure(t) =>


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to