This is an automated email from the ASF dual-hosted git repository.

roiocam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git


The following commit(s) were added to refs/heads/main by this push:
     new 7939510  limit windowing support snapshoted events (#195)
7939510 is described below

commit 7939510b271546783a994a3c7da52426dc040024
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Tue May 28 09:46:33 2024 +0800

    limit windowing support snapshoted events (#195)
    
    * force valid in limit windowing
    
    * fix snapshoted events case
    
    * use full-windows for the first query
    
    * fix oracle insert issue
    
    * add tests to verify
---
 .../dao/BaseJournalDaoWithReadMessages.scala       |  9 +++--
 .../journal/dao/LimitWindowingStreamTest.scala     |  9 ++---
 .../query/CurrentEventsByPersistenceIdTest.scala   | 38 +++++++++++++++++++++
 .../integration/LimitWindowingStreamTest.scala     | 39 ++++++++++++++++++++++
 4 files changed, 89 insertions(+), 6 deletions(-)

diff --git 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index 629ac0c..30f3569 100644
--- 
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++ 
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -51,11 +51,16 @@ trait BaseJournalDaoWithReadMessages extends 
JournalDaoWithReadMessages {
       toSequenceNr: Long,
       batchSize: Int,
       refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
+    val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
     Source
-      .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, 
Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
+      .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr, 
Long)]]]((firstSequenceNr, Continue)) {
         case (from, control) =>
           def limitWindow(from: Long): Long = {
-            math.min(from + batchSize, toSequenceNr)
+            if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue - 
batchSize) < from) {
+              toSequenceNr
+            } else {
+              Math.min(from + batchSize, toSequenceNr)
+            }
           }
 
           def retrieveNextBatch(): Future[Option[((Long, FlowControl), 
Seq[Try[(PersistentRepr, Long)]])]] = {
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
index e9e8ccd..be2679f 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -50,7 +50,8 @@ abstract class LimitWindowingStreamTest(configFile: String)
     implicit val mat: Materializer = SystemMaterializer(system).materializer
 
     val persistenceId = UUID.randomUUID().toString
-    val payload = 'a'.toByte
+    val writerUuid = UUID.randomUUID().toString
+    val payload = Array.fill(16)('a'.toByte)
     val eventsPerBatch = 1000
     val numberOfInsertBatches = 16
     val totalMessages = numberOfInsertBatches * eventsPerBatch
@@ -58,14 +59,14 @@ abstract class LimitWindowingStreamTest(configFile: String)
     withDao { dao =>
       val lastInsert =
         Source
-          .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
+          .fromIterator(() => (1 to numberOfInsertBatches).iterator)
           .mapAsync(1) { i =>
             val end = i * eventsPerBatch
             val start = end - (eventsPerBatch - 1)
-            log.info(s"batch $i (events from $start to $end")
+            log.info(s"batch $i - events from $start to $end")
             val atomicWrites =
               (start to end).map { j =>
-                AtomicWrite(immutable.Seq(PersistentRepr(payload, j, 
persistenceId)))
+                AtomicWrite(immutable.Seq(PersistentRepr(payload, j, 
persistenceId, writerUuid = writerUuid)))
               }
             dao.asyncWriteMessages(atomicWrites).map(_ => i)
           }
diff --git 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
index 1124241..efc45ca 100644
--- 
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
+++ 
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
@@ -22,6 +22,8 @@ import pekko.persistence.query.Offset
 import pekko.persistence.query.{ EventEnvelope, Sequence }
 import pekko.testkit.TestProbe
 
+import scala.concurrent.Future
+
 abstract class CurrentEventsByPersistenceIdTest(config: String) extends 
QueryTestSpec(config) {
   import QueryTestSpec.EventEnvelopeProbeOps
 
@@ -219,6 +221,42 @@ abstract class CurrentEventsByPersistenceIdTest(config: 
String) extends QueryTes
       }
     }
   }
+
+  it should "return event when has been archived more than batch size" in 
withActorSystem { implicit system =>
+    import pekko.pattern.ask
+    import system.dispatcher
+    import scala.concurrent.duration._
+
+    val journalOps = new JavaDslJdbcReadJournalOperations(system)
+    val batchSize = readJournalConfig.maxBufferSize
+    withTestActors(replyToMessages = true) { (actor1, _, _) =>
+      def sendMessages(numberOfMessages: Int): Future[Done] = {
+        val futures = for (i <- 1 to numberOfMessages) yield {
+          actor1 ? i
+        }
+        Future.sequence(futures).map(_ => Done)
+      }
+
+      val numberOfEvents = batchSize << 2
+      val archiveEventSum = numberOfEvents >> 1
+      val batch = sendMessages(numberOfEvents)
+
+      // wait for acknowledgement of the batch
+      batch.futureValue
+
+      // and then archive some of event(delete it).
+      val deleteBatch = for (i <- 1 to archiveEventSum) yield {
+        actor1 ? DeleteCmd(i)
+      }
+      // blocking until all delete commands are processed
+      Future.sequence(deleteBatch).futureValue
+
+      journalOps.withCurrentEventsByPersistenceId()("my-1", 0, Long.MaxValue) 
{ tp =>
+        val allEvents = tp.toStrict(atMost = 10.seconds)
+        allEvents.size shouldBe (numberOfEvents - archiveEventSum)
+      }
+    }
+  }
 }
 
 // Note: these tests use the shared-db configs, the test for all (so not only 
current) events use the regular db config
diff --git 
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
new file mode 100644
index 0000000..6ffcbfb
--- /dev/null
+++ 
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest
+import org.apache.pekko.persistence.jdbc.query.{ MysqlCleaner, OracleCleaner, 
PostgresCleaner, SqlServerCleaner }
+
+class PostgresLimitWindowingStreamTest
+    extends LimitWindowingStreamTest("postgres-application.conf")
+    with PostgresCleaner
+
+class MySQLLimitWindowingStreamTest
+    extends LimitWindowingStreamTest("mysql-application.conf")
+    with MysqlCleaner
+
+class OracleLimitWindowingStreamTest
+    extends LimitWindowingStreamTest("oracle-application.conf")
+    with OracleCleaner
+
+class SqlServerLimitWindowingStreamTest
+    extends LimitWindowingStreamTest("sqlserver-application.conf")
+    with SqlServerCleaner


---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]

Reply via email to