This is an automated email from the ASF dual-hosted git repository.
roiocam pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/pekko-persistence-jdbc.git
The following commit(s) were added to refs/heads/main by this push:
new 7939510 limit windowing support snapshoted events (#195)
7939510 is described below
commit 7939510b271546783a994a3c7da52426dc040024
Author: AndyChen(Jingzhang) <[email protected]>
AuthorDate: Tue May 28 09:46:33 2024 +0800
limit windowing support snapshoted events (#195)
* force valid in limit windowing
* fix snapshoted events case
* use full-windows for the first query
* fix oracle insert issue
* add tests to verify
---
.../dao/BaseJournalDaoWithReadMessages.scala | 9 +++--
.../journal/dao/LimitWindowingStreamTest.scala | 9 ++---
.../query/CurrentEventsByPersistenceIdTest.scala | 38 +++++++++++++++++++++
.../integration/LimitWindowingStreamTest.scala | 39 ++++++++++++++++++++++
4 files changed, 89 insertions(+), 6 deletions(-)
diff --git
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
index 629ac0c..30f3569 100644
---
a/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
+++
b/core/src/main/scala/org/apache/pekko/persistence/jdbc/journal/dao/BaseJournalDaoWithReadMessages.scala
@@ -51,11 +51,16 @@ trait BaseJournalDaoWithReadMessages extends
JournalDaoWithReadMessages {
toSequenceNr: Long,
batchSize: Int,
refreshInterval: Option[(FiniteDuration, Scheduler)]) = {
+ val firstSequenceNr: Long = Math.max(1, fromSequenceNr)
Source
- .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr,
Long)]]]((Math.max(1, fromSequenceNr), Continue)) {
+ .unfoldAsync[(Long, FlowControl), Seq[Try[(PersistentRepr,
Long)]]]((firstSequenceNr, Continue)) {
case (from, control) =>
def limitWindow(from: Long): Long = {
- math.min(from + batchSize, toSequenceNr)
+ if (from == firstSequenceNr || batchSize <= 0 || (Long.MaxValue -
batchSize) < from) {
+ toSequenceNr
+ } else {
+ Math.min(from + batchSize, toSequenceNr)
+ }
}
def retrieveNextBatch(): Future[Option[((Long, FlowControl),
Seq[Try[(PersistentRepr, Long)]])]] = {
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
index e9e8ccd..be2679f 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/journal/dao/LimitWindowingStreamTest.scala
@@ -50,7 +50,8 @@ abstract class LimitWindowingStreamTest(configFile: String)
implicit val mat: Materializer = SystemMaterializer(system).materializer
val persistenceId = UUID.randomUUID().toString
- val payload = 'a'.toByte
+ val writerUuid = UUID.randomUUID().toString
+ val payload = Array.fill(16)('a'.toByte)
val eventsPerBatch = 1000
val numberOfInsertBatches = 16
val totalMessages = numberOfInsertBatches * eventsPerBatch
@@ -58,14 +59,14 @@ abstract class LimitWindowingStreamTest(configFile: String)
withDao { dao =>
val lastInsert =
Source
- .fromIterator(() => (1 to numberOfInsertBatches).toIterator)
+ .fromIterator(() => (1 to numberOfInsertBatches).iterator)
.mapAsync(1) { i =>
val end = i * eventsPerBatch
val start = end - (eventsPerBatch - 1)
- log.info(s"batch $i (events from $start to $end")
+ log.info(s"batch $i - events from $start to $end")
val atomicWrites =
(start to end).map { j =>
- AtomicWrite(immutable.Seq(PersistentRepr(payload, j,
persistenceId)))
+ AtomicWrite(immutable.Seq(PersistentRepr(payload, j,
persistenceId, writerUuid = writerUuid)))
}
dao.asyncWriteMessages(atomicWrites).map(_ => i)
}
diff --git
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
index 1124241..efc45ca 100644
---
a/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
+++
b/core/src/test/scala/org/apache/pekko/persistence/jdbc/query/CurrentEventsByPersistenceIdTest.scala
@@ -22,6 +22,8 @@ import pekko.persistence.query.Offset
import pekko.persistence.query.{ EventEnvelope, Sequence }
import pekko.testkit.TestProbe
+import scala.concurrent.Future
+
abstract class CurrentEventsByPersistenceIdTest(config: String) extends
QueryTestSpec(config) {
import QueryTestSpec.EventEnvelopeProbeOps
@@ -219,6 +221,42 @@ abstract class CurrentEventsByPersistenceIdTest(config:
String) extends QueryTes
}
}
}
+
+ it should "return event when has been archived more than batch size" in
withActorSystem { implicit system =>
+ import pekko.pattern.ask
+ import system.dispatcher
+ import scala.concurrent.duration._
+
+ val journalOps = new JavaDslJdbcReadJournalOperations(system)
+ val batchSize = readJournalConfig.maxBufferSize
+ withTestActors(replyToMessages = true) { (actor1, _, _) =>
+ def sendMessages(numberOfMessages: Int): Future[Done] = {
+ val futures = for (i <- 1 to numberOfMessages) yield {
+ actor1 ? i
+ }
+ Future.sequence(futures).map(_ => Done)
+ }
+
+ val numberOfEvents = batchSize << 2
+ val archiveEventSum = numberOfEvents >> 1
+ val batch = sendMessages(numberOfEvents)
+
+ // wait for acknowledgement of the batch
+ batch.futureValue
+
+ // and then archive some of event(delete it).
+ val deleteBatch = for (i <- 1 to archiveEventSum) yield {
+ actor1 ? DeleteCmd(i)
+ }
+ // blocking until all delete commands are processed
+ Future.sequence(deleteBatch).futureValue
+
+ journalOps.withCurrentEventsByPersistenceId()("my-1", 0, Long.MaxValue)
{ tp =>
+ val allEvents = tp.toStrict(atMost = 10.seconds)
+ allEvents.size shouldBe (numberOfEvents - archiveEventSum)
+ }
+ }
+ }
}
// Note: these tests use the shared-db configs, the test for all (so not only
current) events use the regular db config
diff --git
a/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
new file mode 100644
index 0000000..6ffcbfb
--- /dev/null
+++
b/integration-test/src/test/scala/org/apache/pekko/persistence/jdbc/integration/LimitWindowingStreamTest.scala
@@ -0,0 +1,39 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.pekko.persistence.jdbc.integration
+
+import org.apache.pekko.persistence.jdbc.journal.dao.LimitWindowingStreamTest
+import org.apache.pekko.persistence.jdbc.query.{ MysqlCleaner, OracleCleaner,
PostgresCleaner, SqlServerCleaner }
+
+class PostgresLimitWindowingStreamTest
+ extends LimitWindowingStreamTest("postgres-application.conf")
+ with PostgresCleaner
+
+class MySQLLimitWindowingStreamTest
+ extends LimitWindowingStreamTest("mysql-application.conf")
+ with MysqlCleaner
+
+class OracleLimitWindowingStreamTest
+ extends LimitWindowingStreamTest("oracle-application.conf")
+ with OracleCleaner
+
+class SqlServerLimitWindowingStreamTest
+ extends LimitWindowingStreamTest("sqlserver-application.conf")
+ with SqlServerCleaner
---------------------------------------------------------------------
To unsubscribe, e-mail: [email protected]
For additional commands, e-mail: [email protected]